open-consul/agent/grpc-external/services/peerstream/replication.go

664 lines
21 KiB
Go
Raw Normal View History

package peerstream
import (
"errors"
"fmt"
"strings"
"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/protobuf/proto"
newproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
Protobuf Refactoring for Multi-Module Cleanliness (#16302) Protobuf Refactoring for Multi-Module Cleanliness This commit includes the following: Moves all packages that were within proto/ to proto/private Rewrites imports to account for the packages being moved Adds in buf.work.yaml to enable buf workspaces Names the proto-public buf module so that we can override the Go package imports within proto/buf.yaml Bumps the buf version dependency to 1.14.0 (I was trying out the version to see if it would get around an issue - it didn't but it also doesn't break things and it seemed best to keep up with the toolchain changes) Why: In the future we will need to consume other protobuf dependencies such as the Google HTTP annotations for openapi generation or grpc-gateway usage. There were some recent changes to have our own ratelimiting annotations. The two combined were not working when I was trying to use them together (attempting to rebase another branch) Buf workspaces should be the solution to the problem Buf workspaces means that each module will have generated Go code that embeds proto file names relative to the proto dir and not the top level repo root. This resulted in proto file name conflicts in the Go global protobuf type registry. The solution to that was to add in a private/ directory into the path within the proto/ directory. That then required rewriting all the imports. Is this safe? AFAICT yes The gRPC wire protocol doesn't seem to care about the proto file names (although the Go grpc code does tack on the proto file name as Metadata in the ServiceDesc) Other than imports, there were no changes to any generated code as a result of this.
2023-02-17 21:14:46 +00:00
"github.com/hashicorp/consul/proto/private/pbpeering"
"github.com/hashicorp/consul/proto/private/pbpeerstream"
"github.com/hashicorp/consul/proto/private/pbservice"
"github.com/hashicorp/consul/proto/private/pbstatus"
"github.com/hashicorp/consul/types"
)
/*
TODO(peering):
Then if we ever fail to apply a replication message we should either tear
down the entire connection (and thus force a resync on reconnect) or
request a resync operation.
*/
// makeExportedServiceListResponse handles preparing exported service list updates to the peer cluster.
// Each cache.UpdateEvent will contain all exported services.
func makeExportedServiceListResponse(
mst *MutableStatus,
update cache.UpdateEvent,
) (*pbpeerstream.ReplicationMessage_Response, error) {
exportedService, ok := update.Result.(*pbpeerstream.ExportedServiceList)
if !ok {
return nil, fmt.Errorf("invalid type for exported service list response: %T", update.Result)
}
any, _, err := marshalToProtoAny[*pbpeerstream.ExportedServiceList](exportedService)
if err != nil {
return nil, fmt.Errorf("failed to marshal: %w", err)
}
var serviceNames []structs.ServiceName
for _, serviceName := range exportedService.Services {
sn := structs.ServiceNameFromString(serviceName)
serviceNames = append(serviceNames, sn)
}
mst.SetExportedServices(serviceNames)
return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLExportedServiceList,
ResourceID: subExportedServiceList,
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
}, nil
}
// makeServiceResponse handles preparing exported service instance updates to the peer cluster.
// Each cache.UpdateEvent will contain all instances for a service name.
// If there are no instances in the event, we consider that to be a de-registration.
func makeServiceResponse(update cache.UpdateEvent) (*pbpeerstream.ReplicationMessage_Response, error) {
serviceName := strings.TrimPrefix(update.CorrelationID, subExportedService)
csn, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
if !ok {
return nil, fmt.Errorf("invalid type for service response: %T", update.Result)
}
export := &pbpeerstream.ExportedService{
Nodes: csn.Nodes,
}
any, err := anypb.New(export)
if err != nil {
return nil, fmt.Errorf("failed to marshal: %w", err)
}
return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLExportedService,
ResourceID: serviceName,
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
}, nil
}
func makeCARootsResponse(
update cache.UpdateEvent,
) (*pbpeerstream.ReplicationMessage_Response, error) {
any, _, err := marshalToProtoAny[*pbpeering.PeeringTrustBundle](update.Result)
if err != nil {
return nil, fmt.Errorf("failed to marshal: %w", err)
}
return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLPeeringTrustBundle,
ResourceID: "roots",
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
}, nil
}
func makeServerAddrsResponse(
update cache.UpdateEvent,
) (*pbpeerstream.ReplicationMessage_Response, error) {
any, _, err := marshalToProtoAny[*pbpeering.PeeringServerAddresses](update.Result)
if err != nil {
return nil, fmt.Errorf("failed to marshal: %w", err)
}
return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLPeeringServerAddresses,
ResourceID: "server-addrs",
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
}, nil
}
// marshalToProtoAny takes any input and returns:
// the protobuf.Any type, the asserted T type, and any errors
// during marshalling or type assertion.
// `in` MUST be of type T or it returns an error.
func marshalToProtoAny[T newproto.Message](in any) (*anypb.Any, T, error) {
typ, ok := in.(T)
if !ok {
var outType T
return nil, typ, fmt.Errorf("input type is not %T: %T", outType, in)
}
any, err := anypb.New(typ)
if err != nil {
return nil, typ, err
}
return any, typ, nil
}
func (s *Server) processResponse(
peerName string,
partition string,
mutableStatus *MutableStatus,
resp *pbpeerstream.ReplicationMessage_Response,
) (*pbpeerstream.ReplicationMessage, error) {
if !pbpeerstream.KnownTypeURL(resp.ResourceURL) {
err := fmt.Errorf("received response for unknown resource type %q", resp.ResourceURL)
return makeNACKReply(
resp.ResourceURL,
resp.Nonce,
code.Code_INVALID_ARGUMENT,
err.Error(),
), err
}
if resp.Nonce == "" {
err := fmt.Errorf("received response without a nonce for: %s:%s", resp.ResourceURL, resp.ResourceID)
return makeNACKReply(
resp.ResourceURL,
resp.Nonce,
code.Code_INVALID_ARGUMENT,
err.Error(),
), err
}
switch resp.Operation {
case pbpeerstream.Operation_OPERATION_UPSERT:
if resp.Resource == nil {
err := fmt.Errorf("received upsert response with no content")
return makeNACKReply(
resp.ResourceURL,
resp.Nonce,
code.Code_INVALID_ARGUMENT,
err.Error(),
), err
}
2022-08-22 14:09:47 +00:00
if err := s.handleUpsert(peerName, partition, mutableStatus, resp.ResourceURL, resp.ResourceID, resp.Resource); err != nil {
return makeNACKReply(
resp.ResourceURL,
resp.Nonce,
code.Code_INTERNAL,
fmt.Sprintf("upsert error, ResourceURL: %q, ResourceID: %q: %v", resp.ResourceURL, resp.ResourceID, err),
), fmt.Errorf("upsert error: %w", err)
}
return makeACKReply(resp.ResourceURL, resp.Nonce), nil
default:
var errMsg string
if op := pbpeerstream.Operation_name[int32(resp.Operation)]; op != "" {
errMsg = fmt.Sprintf("unsupported operation: %q", op)
} else {
errMsg = fmt.Sprintf("unsupported operation: %d", resp.Operation)
}
return makeNACKReply(
resp.ResourceURL,
resp.Nonce,
code.Code_INVALID_ARGUMENT,
errMsg,
), errors.New(errMsg)
}
}
func (s *Server) handleUpsert(
peerName string,
partition string,
mutableStatus *MutableStatus,
resourceURL string,
resourceID string,
resource *anypb.Any,
) error {
if resource.TypeUrl != resourceURL {
return fmt.Errorf("mismatched resourceURL %q and Any typeUrl %q", resourceURL, resource.TypeUrl)
}
switch resourceURL {
case pbpeerstream.TypeURLExportedServiceList:
export := &pbpeerstream.ExportedServiceList{}
if err := resource.UnmarshalTo(export); err != nil {
return fmt.Errorf("failed to unmarshal resource: %w", err)
}
err := s.handleUpsertExportedServiceList(mutableStatus, peerName, partition, export)
if err != nil {
return fmt.Errorf("did not update imported services based on the exported service list event: %w", err)
}
return nil
case pbpeerstream.TypeURLExportedService:
sn := structs.ServiceNameFromString(resourceID)
sn.OverridePartition(partition)
export := &pbpeerstream.ExportedService{}
if err := resource.UnmarshalTo(export); err != nil {
return fmt.Errorf("failed to unmarshal resource: %w", err)
}
err := s.handleUpdateService(peerName, partition, sn, export)
if err != nil {
return fmt.Errorf("did not increment imported services count for service=%q: %w", sn.String(), err)
}
return nil
case pbpeerstream.TypeURLPeeringTrustBundle:
roots := &pbpeering.PeeringTrustBundle{}
if err := resource.UnmarshalTo(roots); err != nil {
return fmt.Errorf("failed to unmarshal resource: %w", err)
}
return s.handleUpsertRoots(peerName, partition, roots)
case pbpeerstream.TypeURLPeeringServerAddresses:
addrs := &pbpeering.PeeringServerAddresses{}
if err := resource.UnmarshalTo(addrs); err != nil {
return fmt.Errorf("failed to unmarshal resource: %w", err)
}
return s.handleUpsertServerAddrs(peerName, partition, addrs)
default:
return fmt.Errorf("unexpected resourceURL: %s", resourceURL)
}
}
func (s *Server) handleUpsertExportedServiceList(
mutableStatus *MutableStatus,
peerName string,
partition string,
export *pbpeerstream.ExportedServiceList,
) error {
exportedServices := make(map[structs.ServiceName]struct{})
var serviceNames []structs.ServiceName
for _, service := range export.Services {
sn := structs.ServiceNameFromString(service)
sn.OverridePartition(partition)
// This ensures that we don't delete exported service's sidecars below.
snSidecarProxy := structs.ServiceNameFromString(service + syntheticProxyNameSuffix)
snSidecarProxy.OverridePartition(partition)
exportedServices[sn] = struct{}{}
exportedServices[snSidecarProxy] = struct{}{}
serviceNames = append(serviceNames, sn)
}
entMeta := structs.NodeEnterpriseMetaInPartition(partition)
_, serviceList, err := s.GetStore().ServiceList(nil, entMeta, peerName)
if err != nil {
return err
}
for _, sn := range serviceList {
if _, ok := exportedServices[sn]; !ok {
err := s.handleUpdateService(peerName, partition, sn, nil)
if err != nil {
return fmt.Errorf("failed to delete unexported service: %w", err)
}
}
}
mutableStatus.SetImportedServices(serviceNames)
return nil
}
// handleUpdateService handles both deletion and upsert events for a service.
//
// On an UPSERT event:
// - All nodes, services, checks in the input pbNodes are re-applied through Raft.
// - Any nodes, services, or checks in the catalog that were not in the input pbNodes get deleted.
//
// On a DELETE event:
// - A reconciliation against nil or empty input pbNodes leads to deleting all stored catalog resources
// associated with the service name.
func (s *Server) handleUpdateService(
peerName string,
partition string,
sn structs.ServiceName,
export *pbpeerstream.ExportedService,
) error {
// Capture instances in the state store for reconciliation later.
_, storedInstances, err := s.GetStore().CheckServiceNodes(nil, sn.Name, &sn.EnterpriseMeta, peerName)
if err != nil {
return fmt.Errorf("failed to read imported services: %w", err)
}
structsNodes := []structs.CheckServiceNode{}
if export != nil {
structsNodes, err = export.CheckServiceNodesToStruct()
if err != nil {
return fmt.Errorf("failed to convert protobuf instances to structs: %w", err)
}
}
// Normalize the data into a convenient form for operation.
snap := newHealthSnapshot(structsNodes, partition, peerName)
storedNodesMap, storedSvcInstMap, storedChecksMap := buildStoredMap(storedInstances)
for _, nodeSnap := range snap.Nodes {
// First register the node - skip the unchanged ones
changed := true
if storedNode, ok := storedNodesMap[nodeSnap.Node.ID]; ok {
if storedNode.IsSame(nodeSnap.Node) {
changed = false
}
}
req := nodeSnap.Node.ToRegisterRequest()
if changed {
if err := s.Backend.CatalogRegister(&req); err != nil {
return fmt.Errorf("failed to register node: %w", err)
}
}
// Then register all services on that node - skip the unchanged ones
for _, svcSnap := range nodeSnap.Services {
changed = true
if storedSvcInst, ok := storedSvcInstMap[makeNodeSvcInstID(nodeSnap.Node.ID, svcSnap.Service.ID)]; ok {
if storedSvcInst.IsSame(svcSnap.Service) {
changed = false
}
}
if changed {
req.Service = svcSnap.Service
if err := s.Backend.CatalogRegister(&req); err != nil {
return fmt.Errorf("failed to register service: %w", err)
}
}
}
req.Service = nil
// Then register all checks on that node - skip the unchanged ones
var chks structs.HealthChecks
for _, svcSnap := range nodeSnap.Services {
for _, c := range svcSnap.Checks {
changed := true
if chk, ok := storedChecksMap[makeNodeCheckID(nodeSnap.Node.ID, svcSnap.Service.ID, c.CheckID)]; ok {
if chk.IsSame(c) {
changed = false
}
}
if changed {
chks = append(chks, c)
}
}
}
if len(chks) > 0 {
req.Checks = chks
if err := s.Backend.CatalogRegister(&req); err != nil {
return fmt.Errorf("failed to register check: %w", err)
}
}
}
//
// Now that the data received has been stored in the state store, the rest of this
// function is responsible for cleaning up data in the catalog that wasn't in the snapshot.
//
// nodeCheckTuple uniquely identifies a node check in the catalog.
// The partition is not needed because we are only operating on one partition's catalog.
type nodeCheckTuple struct {
checkID types.CheckID
node string
}
var (
// unusedNodes tracks node names that were not present in the latest response.
// Missing nodes are not assumed to be deleted because there may be other service names
// registered on them.
// Inside we also track a map of node checks associated with the node.
unusedNodes = make(map[string]struct{})
// deletedNodeChecks tracks node checks that were not present in the latest response.
// A single node check will be attached to all service instances of a node, so this
// deduplication prevents issuing multiple deregistrations for a single check.
deletedNodeChecks = make(map[nodeCheckTuple]struct{})
)
for _, csn := range storedInstances {
if _, ok := snap.Nodes[csn.Node.Node]; !ok {
unusedNodes[csn.Node.Node] = struct{}{}
// Since the node is not in the snapshot we can know the associated service
// instance is not in the snapshot either, since a service instance can't
// exist without a node.
// This will also delete all service checks.
err := s.Backend.CatalogDeregister(&structs.DeregisterRequest{
Node: csn.Node.Node,
ServiceID: csn.Service.ID,
EnterpriseMeta: csn.Service.EnterpriseMeta,
PeerName: peerName,
})
if err != nil {
return fmt.Errorf("failed to deregister service %q: %w", csn.Service.CompoundServiceID(), err)
}
// We can't know if a node check was deleted from the exporting cluster
// (but not the node itself) if the node wasn't in the snapshot,
// so we do not loop over checks here.
// If the unusedNode gets deleted below that will also delete node checks.
continue
}
// Delete the service instance if not in the snapshot.
sid := csn.Service.CompoundServiceID()
if _, ok := snap.Nodes[csn.Node.Node].Services[sid]; !ok {
err := s.Backend.CatalogDeregister(&structs.DeregisterRequest{
Node: csn.Node.Node,
ServiceID: csn.Service.ID,
EnterpriseMeta: csn.Service.EnterpriseMeta,
PeerName: peerName,
})
if err != nil {
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s/ns:%s/service_id:%s",
csn.Service.PartitionOrDefault(), peerName, csn.Node.Node, csn.Service.NamespaceOrDefault(), csn.Service.ID)
return fmt.Errorf("failed to deregister service %q: %w", ident, err)
}
// When a service is deleted all associated checks also get deleted as a side effect.
continue
}
// Reconcile checks.
for _, chk := range csn.Checks {
if _, ok := snap.Nodes[csn.Node.Node].Services[sid].Checks[chk.CheckID]; !ok {
// Checks without a ServiceID are node checks.
// If the node exists but the check does not then the check was deleted.
if chk.ServiceID == "" {
// Deduplicate node checks to avoid deregistering a check multiple times.
tuple := nodeCheckTuple{
checkID: chk.CheckID,
node: chk.Node,
}
deletedNodeChecks[tuple] = struct{}{}
continue
}
// If the check isn't a node check then it's a service check.
// Service checks that were not present can be deleted immediately because
// checks for a given service ID will only be attached to a single CheckServiceNode.
err := s.Backend.CatalogDeregister(&structs.DeregisterRequest{
Node: chk.Node,
CheckID: chk.CheckID,
EnterpriseMeta: chk.EnterpriseMeta,
PeerName: peerName,
})
if err != nil {
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s/ns:%s/check_id:%s",
chk.PartitionOrDefault(), peerName, chk.Node, chk.NamespaceOrDefault(), chk.CheckID)
return fmt.Errorf("failed to deregister check %q: %w", ident, err)
}
}
}
}
// Delete all deduplicated node checks.
for chk := range deletedNodeChecks {
nodeMeta := structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault())
err := s.Backend.CatalogDeregister(&structs.DeregisterRequest{
Node: chk.node,
CheckID: chk.checkID,
EnterpriseMeta: *nodeMeta,
PeerName: peerName,
})
if err != nil {
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s/check_id:%s", nodeMeta.PartitionOrDefault(), peerName, chk.node, chk.checkID)
return fmt.Errorf("failed to deregister node check %q: %w", ident, err)
}
}
// Delete any nodes that do not have any other services registered on them.
for node := range unusedNodes {
nodeMeta := structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault())
_, ns, err := s.GetStore().NodeServices(nil, node, nodeMeta, peerName)
if err != nil {
return fmt.Errorf("failed to query services on node: %w", err)
}
if ns != nil && len(ns.Services) >= 1 {
// At least one service is still registered on this node, so we keep it.
continue
}
// All services on the node were deleted, so the node is also cleaned up.
err = s.Backend.CatalogDeregister(&structs.DeregisterRequest{
Node: node,
PeerName: peerName,
EnterpriseMeta: *nodeMeta,
})
if err != nil {
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s", nodeMeta.PartitionOrDefault(), peerName, node)
return fmt.Errorf("failed to deregister node %q: %w", ident, err)
}
}
return nil
}
func (s *Server) handleUpsertRoots(
peerName string,
partition string,
trustBundle *pbpeering.PeeringTrustBundle,
) error {
// We override the partition and peer name so that the trust bundle gets stored
// in the importing partition with a reference to the peer it was imported from.
trustBundle.Partition = partition
trustBundle.PeerName = peerName
req := &pbpeering.PeeringTrustBundleWriteRequest{
PeeringTrustBundle: trustBundle,
}
return s.Backend.PeeringTrustBundleWrite(req)
}
func (s *Server) handleUpsertServerAddrs(
peerName string,
partition string,
addrs *pbpeering.PeeringServerAddresses,
) error {
q := state.Query{
Value: peerName,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition(partition),
}
_, existing, err := s.GetStore().PeeringRead(nil, q)
if err != nil {
return fmt.Errorf("failed to read peering: %w", err)
}
if existing == nil || !existing.IsActive() {
return fmt.Errorf("peering does not exist or has been marked for deletion")
}
// Clone to avoid mutating the existing data
p := proto.Clone(existing).(*pbpeering.Peering)
p.PeerServerAddresses = addrs.GetAddresses()
req := &pbpeering.PeeringWriteRequest{
Peering: p,
}
return s.Backend.PeeringWrite(req)
}
func makeACKReply(resourceURL, nonce string) *pbpeerstream.ReplicationMessage {
return makeReplicationRequest(&pbpeerstream.ReplicationMessage_Request{
ResourceURL: resourceURL,
ResponseNonce: nonce,
})
}
func makeNACKReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbpeerstream.ReplicationMessage {
var rpcErr *pbstatus.Status
if errCode != code.Code_OK || errMsg != "" {
rpcErr = &pbstatus.Status{
Code: int32(errCode),
Message: errMsg,
}
}
return makeReplicationRequest(&pbpeerstream.ReplicationMessage_Request{
ResourceURL: resourceURL,
ResponseNonce: nonce,
Error: rpcErr,
})
}
// makeReplicationRequest is a convenience method to make a Request-type ReplicationMessage.
func makeReplicationRequest(req *pbpeerstream.ReplicationMessage_Request) *pbpeerstream.ReplicationMessage {
return &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: req,
},
}
}
// makeReplicationResponse is a convenience method to make a Response-type ReplicationMessage.
func makeReplicationResponse(resp *pbpeerstream.ReplicationMessage_Response) *pbpeerstream.ReplicationMessage {
return &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Response_{
Response: resp,
},
}
}
// nodeSvcInstIdentity uniquely identifies an service instance imported from a peering cluster
type nodeSvcInstIdentity struct {
nodeID string
serviceID string
}
// nodeCheckIdentity uniquely identifies a check imported from a peering cluster
type nodeCheckIdentity struct {
nodeID string
serviceID string
checkID string
}
func makeNodeSvcInstID(nodeID types.NodeID, serviceID string) nodeSvcInstIdentity {
return nodeSvcInstIdentity{
nodeID: string(nodeID),
serviceID: serviceID,
}
}
func makeNodeCheckID(nodeID types.NodeID, serviceID string, checkID types.CheckID) nodeCheckIdentity {
return nodeCheckIdentity{
serviceID: serviceID,
checkID: string(checkID),
nodeID: string(nodeID),
}
}
func buildStoredMap(storedInstances structs.CheckServiceNodes) (map[types.NodeID]*structs.Node, map[nodeSvcInstIdentity]*structs.NodeService, map[nodeCheckIdentity]*structs.HealthCheck) {
nodesMap := map[types.NodeID]*structs.Node{}
svcInstMap := map[nodeSvcInstIdentity]*structs.NodeService{}
checksMap := map[nodeCheckIdentity]*structs.HealthCheck{}
for _, csn := range storedInstances {
nodesMap[csn.Node.ID] = csn.Node
svcInstMap[makeNodeSvcInstID(csn.Node.ID, csn.Service.ID)] = csn.Service
for _, chk := range csn.Checks {
checksMap[makeNodeCheckID(csn.Node.ID, csn.Service.ID, chk.CheckID)] = chk
}
}
return nodesMap, svcInstMap, checksMap
}