From 6216a96f93e00c08430047f97e136eee960999eb Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Fri, 19 May 2023 13:53:29 -0400 Subject: [PATCH] Add the workload health controller (#17215) --- agent/consul/server.go | 2 +- internal/catalog/exports.go | 13 +- .../catalog/internal/controllers/register.go | 10 +- .../controllers/workloadhealth/controller.go | 238 ++++++ .../workloadhealth/controller_test.go | 760 ++++++++++++++++++ .../controllers/workloadhealth/status.go | 11 + .../mappers/nodemapper/node_mapper.go | 104 +++ .../mappers/nodemapper/node_mapper_test.go | 146 ++++ 8 files changed, 1279 insertions(+), 5 deletions(-) create mode 100644 internal/catalog/internal/controllers/workloadhealth/controller.go create mode 100644 internal/catalog/internal/controllers/workloadhealth/controller_test.go create mode 100644 internal/catalog/internal/controllers/workloadhealth/status.go create mode 100644 internal/catalog/internal/mappers/nodemapper/node_mapper.go create mode 100644 internal/catalog/internal/mappers/nodemapper/node_mapper_test.go diff --git a/agent/consul/server.go b/agent/consul/server.go index 46adbd995..88c05b711 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -844,7 +844,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom func (s *Server) registerResources() { catalog.RegisterTypes(s.typeRegistry) - catalog.RegisterControllers(s.controllerManager) + catalog.RegisterControllers(s.controllerManager, catalog.DefaultControllerDependencies()) mesh.RegisterTypes(s.typeRegistry) reaper.RegisterControllers(s.controllerManager) diff --git a/internal/catalog/exports.go b/internal/catalog/exports.go index 39d6c44e6..10f2f9be0 100644 --- a/internal/catalog/exports.go +++ b/internal/catalog/exports.go @@ -5,6 +5,7 @@ package catalog import ( "github.com/hashicorp/consul/internal/catalog/internal/controllers" + "github.com/hashicorp/consul/internal/catalog/internal/mappers/nodemapper" "github.com/hashicorp/consul/internal/catalog/internal/types" "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/resource" @@ -46,8 +47,16 @@ func RegisterTypes(r resource.Registry) { types.Register(r) } +type ControllerDependencies = controllers.Dependencies + +func DefaultControllerDependencies() ControllerDependencies { + return ControllerDependencies{ + WorkloadHealthNodeMapper: nodemapper.New(), + } +} + // RegisterControllers registers controllers for the catalog types with // the given controller Manager. -func RegisterControllers(mgr *controller.Manager) { - controllers.Register(mgr) +func RegisterControllers(mgr *controller.Manager, deps ControllerDependencies) { + controllers.Register(mgr, deps) } diff --git a/internal/catalog/internal/controllers/register.go b/internal/catalog/internal/controllers/register.go index b4b6f190f..1a7987d12 100644 --- a/internal/catalog/internal/controllers/register.go +++ b/internal/catalog/internal/controllers/register.go @@ -5,9 +5,15 @@ package controllers import ( "github.com/hashicorp/consul/internal/catalog/internal/controllers/nodehealth" + "github.com/hashicorp/consul/internal/catalog/internal/controllers/workloadhealth" "github.com/hashicorp/consul/internal/controller" ) -func Register(mgr *controller.Manager) { - mgr.Register(nodehealth.NodeHealthController()) +type Dependencies struct { + WorkloadHealthNodeMapper workloadhealth.NodeMapper +} + +func Register(mgr *controller.Manager, deps Dependencies) { + mgr.Register(nodehealth.NodeHealthController()) + mgr.Register(workloadhealth.WorkloadHealthController(deps.WorkloadHealthNodeMapper)) } diff --git a/internal/catalog/internal/controllers/workloadhealth/controller.go b/internal/catalog/internal/controllers/workloadhealth/controller.go new file mode 100644 index 000000000..b9c19b541 --- /dev/null +++ b/internal/catalog/internal/controllers/workloadhealth/controller.go @@ -0,0 +1,238 @@ +package workloadhealth + +import ( + "context" + "errors" + "fmt" + + "github.com/hashicorp/consul/internal/catalog/internal/controllers/nodehealth" + "github.com/hashicorp/consul/internal/catalog/internal/types" + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/resource" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var ( + errNodeUnreconciled = errors.New("Node health has not been reconciled yet") + errNodeHealthInvalid = errors.New("Node health has invalid reason") + errNodeHealthConditionNotFound = fmt.Errorf("Node health status is missing the %s condition", nodehealth.StatusConditionHealthy) +) + +// The NodeMapper interface is used to provide an implementation around being able to +// map a watch event for a Node resource and translate it to reconciliation requests +// for all Workloads assigned to that node. +type NodeMapper interface { + // MapNodeToWorkloads will take a Node resource and return controller requests + // for all Workloads associated with the Node. + MapNodeToWorkloads(ctx context.Context, rt controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) + + // TrackWorkload instructs the NodeMapper to associate the given workload + // ID with the given node ID. + TrackWorkload(workloadID *pbresource.ID, nodeID *pbresource.ID) + + // UntrackWorkload instructs the Nodemapper to forget about any + // association it was tracking for this workload. + UntrackWorkload(workloadID *pbresource.ID) + + // NodeIDFromWorkload is used to generate the resource ID for the Node referenced + // within the NodeName field of the Workload. + NodeIDFromWorkload(workload *pbresource.Resource, workloadData *pbcatalog.Workload) *pbresource.ID +} + +func WorkloadHealthController(nodeMap NodeMapper) controller.Controller { + if nodeMap == nil { + panic("No NodeMapper was provided to the WorkloadHealthController constructor") + } + + return controller.ForType(types.WorkloadType). + WithWatch(types.HealthStatusType, controller.MapOwnerFiltered(types.WorkloadType)). + WithWatch(types.NodeType, nodeMap.MapNodeToWorkloads). + WithReconciler(&workloadHealthReconciler{nodeMap: nodeMap}) +} + +type workloadHealthReconciler struct { + nodeMap NodeMapper +} + +func (r *workloadHealthReconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error { + // The runtime is passed by value so replacing it here for the remaineder of this + // reconciliation request processing will not affect future invocations. + rt.Logger = rt.Logger.With("resource-id", req.ID, "controller", StatusKey) + + rt.Logger.Trace("reconciling workload health") + + // read the workload + rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: req.ID}) + switch { + case status.Code(err) == codes.NotFound: + rt.Logger.Trace("workload has been deleted") + r.nodeMap.UntrackWorkload(req.ID) + return nil + case err != nil: + rt.Logger.Error("the resource service has returned an unexpected error", "error", err) + return err + } + + res := rsp.Resource + var workload pbcatalog.Workload + if err := res.Data.UnmarshalTo(&workload); err != nil { + // This should be impossible and will not be exercised in tests. Various + // type validations on admission ensure that all Workloads would + // be marshallable in this way. + rt.Logger.Error("error unmarshalling workload data", "error", err) + return err + } + + nodeHealth := pbcatalog.Health_HEALTH_PASSING + if workload.NodeName != "" { + nodeID := r.nodeMap.NodeIDFromWorkload(res, &workload) + r.nodeMap.TrackWorkload(res.Id, nodeID) + nodeHealth, err = getNodeHealth(ctx, rt, nodeID) + if err != nil { + rt.Logger.Error("error looking up node health", "error", err, "node-id", nodeID) + return err + } + } else { + // the node association may be been removed so stop tracking it. + r.nodeMap.UntrackWorkload(res.Id) + } + + workloadHealth, err := getWorkloadHealth(ctx, rt, req.ID) + if err != nil { + // This should be impossible under normal operations and will not be exercised + // within the unit tests. This can only fail if the resource service fails + // or allows admission of invalid health statuses. + rt.Logger.Error("error aggregating workload health statuses", "error", err) + return err + } + + health := nodeHealth + if workloadHealth > health { + health = workloadHealth + } + + statusState := pbresource.Condition_STATE_TRUE + if health != pbcatalog.Health_HEALTH_PASSING { + statusState = pbresource.Condition_STATE_FALSE + } + + message := WorkloadHealthyMessage + if workload.NodeName != "" { + message = NodeAndWorkloadHealthyMessage + } + switch { + case workloadHealth != pbcatalog.Health_HEALTH_PASSING && nodeHealth != pbcatalog.Health_HEALTH_PASSING: + message = NodeAndWorkloadUnhealthyMessage + case workloadHealth != pbcatalog.Health_HEALTH_PASSING: + message = WorkloadUnhealthyMessage + case nodeHealth != pbcatalog.Health_HEALTH_PASSING: + message = nodehealth.NodeUnhealthyMessage + } + + newStatus := &pbresource.Status{ + ObservedGeneration: res.Generation, + Conditions: []*pbresource.Condition{ + { + Type: StatusConditionHealthy, + State: statusState, + Reason: health.String(), + Message: message, + }, + }, + } + + if resource.EqualStatus(res.Status[StatusKey], newStatus, false) { + rt.Logger.Trace("resources workload health status is unchanged", + "health", health.String(), + "node-health", nodeHealth.String(), + "workload-health", workloadHealth.String()) + return nil + } + + _, err = rt.Client.WriteStatus(ctx, &pbresource.WriteStatusRequest{ + Id: res.Id, + Key: StatusKey, + Status: newStatus, + }) + + if err != nil { + rt.Logger.Error("error encountered when attempting to update the resources workload status", "error", err) + return err + } + + rt.Logger.Trace("resource's workload health status was updated", + "health", health.String(), + "node-health", nodeHealth.String(), + "workload-health", workloadHealth.String()) + return nil +} + +func getNodeHealth(ctx context.Context, rt controller.Runtime, nodeRef *pbresource.ID) (pbcatalog.Health, error) { + rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: nodeRef}) + switch { + case status.Code(err) == codes.NotFound: + return pbcatalog.Health_HEALTH_CRITICAL, nil + case err != nil: + return pbcatalog.Health_HEALTH_CRITICAL, err + default: + healthStatus, ok := rsp.Resource.Status[nodehealth.StatusKey] + if !ok { + // The Nodes health has never been reconciled and therefore the + // workloads health cannot be determined. Returning nil is acceptable + // because the controller should sometime soon run reconciliation for + // the node which will then trigger rereconciliation of this workload + return pbcatalog.Health_HEALTH_CRITICAL, errNodeUnreconciled + } + + for _, condition := range healthStatus.Conditions { + if condition.Type == nodehealth.StatusConditionHealthy { + if condition.State == pbresource.Condition_STATE_TRUE { + return pbcatalog.Health_HEALTH_PASSING, nil + } + + healthReason, valid := pbcatalog.Health_value[condition.Reason] + if !valid { + // The Nodes health is unknown - presumably the node health controller + // will come along and fix that up momentarily causing this workload + // reconciliation to occur again. + return pbcatalog.Health_HEALTH_CRITICAL, errNodeHealthInvalid + } + return pbcatalog.Health(healthReason), nil + } + } + return pbcatalog.Health_HEALTH_CRITICAL, errNodeHealthConditionNotFound + } +} + +func getWorkloadHealth(ctx context.Context, rt controller.Runtime, workloadRef *pbresource.ID) (pbcatalog.Health, error) { + rsp, err := rt.Client.ListByOwner(ctx, &pbresource.ListByOwnerRequest{ + Owner: workloadRef, + }) + + if err != nil { + return pbcatalog.Health_HEALTH_CRITICAL, err + } + + workloadHealth := pbcatalog.Health_HEALTH_PASSING + + for _, res := range rsp.Resources { + if resource.EqualType(res.Id.Type, types.HealthStatusType) { + var hs pbcatalog.HealthStatus + if err := res.Data.UnmarshalTo(&hs); err != nil { + // This should be impossible and will not be executing in tests. The resource type + // is the HealthStatus type and therefore must be unmarshallable into the HealthStatus + // object or else it wouldn't have passed admission validation checks. + return workloadHealth, fmt.Errorf("error unmarshalling health status data: %w", err) + } + + if hs.Status > workloadHealth { + workloadHealth = hs.Status + } + } + } + + return workloadHealth, nil +} diff --git a/internal/catalog/internal/controllers/workloadhealth/controller_test.go b/internal/catalog/internal/controllers/workloadhealth/controller_test.go new file mode 100644 index 000000000..29d93f088 --- /dev/null +++ b/internal/catalog/internal/controllers/workloadhealth/controller_test.go @@ -0,0 +1,760 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package workloadhealth + +import ( + "context" + "fmt" + "testing" + + svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing" + "github.com/hashicorp/consul/internal/catalog/internal/controllers/nodehealth" + "github.com/hashicorp/consul/internal/catalog/internal/mappers/nodemapper" + "github.com/hashicorp/consul/internal/catalog/internal/types" + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/resource/resourcetest" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/proto/private/prototest" + "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/consul/sdk/testutil/retry" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var ( + nodeData = &pbcatalog.Node{ + Addresses: []*pbcatalog.NodeAddress{ + { + Host: "127.0.0.1", + }, + }, + } + + fakeType = &pbresource.Type{ + Group: "not", + GroupVersion: "vfake", + Kind: "found", + } +) + +func resourceID(rtype *pbresource.Type, name string) *pbresource.ID { + return &pbresource.ID{ + Type: rtype, + Tenancy: &pbresource.Tenancy{ + Partition: "default", + Namespace: "default", + PeerName: "local", + }, + Name: name, + } +} + +func workloadData(nodeName string) *pbcatalog.Workload { + return &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + { + Host: "198.18.0.1", + }, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "http": { + Port: 8080, + Protocol: pbcatalog.Protocol_PROTOCOL_HTTP, + }, + }, + Identity: "test", + NodeName: nodeName, + } +} + +// controllerSuite is just the base information the three other test suites +// in this file will use. It will be embedded into the others allowing +// for the test helpers and default setup to be reused and to force consistent +// anming of the various data bits this holds on to. +type controllerSuite struct { + suite.Suite + client pbresource.ResourceServiceClient + runtime controller.Runtime +} + +func (suite *controllerSuite) SetupTest() { + suite.client = svctest.RunResourceService(suite.T(), types.Register) + suite.runtime = controller.Runtime{Client: suite.client, Logger: testutil.Logger(suite.T())} +} + +// injectNodeWithStatus is a helper method to write a Node resource and synthesize its status +// in a manner consistent with the node-health controller. This allows us to not actually +// run and test the node-health controller but consume its "api" in the form of how +// it encodes status. +func (suite *controllerSuite) injectNodeWithStatus(name string, health pbcatalog.Health) *pbresource.Resource { + suite.T().Helper() + state := pbresource.Condition_STATE_TRUE + if health >= pbcatalog.Health_HEALTH_WARNING { + state = pbresource.Condition_STATE_FALSE + } + + return resourcetest.Resource(types.NodeType, name). + WithData(suite.T(), nodeData). + WithStatus(nodehealth.StatusKey, &pbresource.Status{ + Conditions: []*pbresource.Condition{ + { + Type: nodehealth.StatusConditionHealthy, + State: state, + Reason: health.String(), + }, + }, + }). + Write(suite.T(), suite.client) +} + +// the workloadHealthControllerTestSuite intends to test the main Reconciliation +// functionality but will not do exhaustive testing of the getNodeHealth +// or getWorkloadHealth functions. Without mocking the resource service which +// we for now are avoiding, it should be impossible to inject errors into +// those functions that would force some kinds of error cases. Therefore, +// those other functions will be tested with their own test suites. +type workloadHealthControllerTestSuite struct { + controllerSuite + + mapper *nodemapper.NodeMapper + reconciler *workloadHealthReconciler +} + +func (suite *workloadHealthControllerTestSuite) SetupTest() { + // invoke all the other suite setup + suite.controllerSuite.SetupTest() + + suite.mapper = nodemapper.New() + suite.reconciler = &workloadHealthReconciler{ + nodeMap: suite.mapper, + } +} + +// testReconcileWithNode will inject a node with the given health, a workload +// associated with that node and then a health status owned by the workload +// with the given workload health. Once all the resource injection has been +// performed this will invoke the Reconcile method once on the reconciler +// and checks a couple things: +// +// * The node to workload association is now being tracked by the node mapper +// * The workloads status was updated and now matches the expected value +func (suite *workloadHealthControllerTestSuite) testReconcileWithNode(nodeHealth, workloadHealth pbcatalog.Health, status *pbresource.Condition) *pbresource.Resource { + suite.T().Helper() + + node := suite.injectNodeWithStatus("test-node", nodeHealth) + + workload := resourcetest.Resource(types.WorkloadType, "test-workload"). + WithData(suite.T(), workloadData(node.Id.Name)). + Write(suite.T(), suite.client) + + resourcetest.Resource(types.HealthStatusType, "test-status"). + WithData(suite.T(), &pbcatalog.HealthStatus{Type: "tcp", Status: workloadHealth}). + WithOwner(workload.Id). + Write(suite.T(), suite.client) + + err := suite.reconciler.Reconcile(context.Background(), suite.runtime, controller.Request{ + ID: workload.Id, + }) + + require.NoError(suite.T(), err) + + // ensure that the node is now being tracked by the mapper + reqs, err := suite.mapper.MapNodeToWorkloads(context.Background(), suite.runtime, node) + require.NoError(suite.T(), err) + require.Len(suite.T(), reqs, 1) + prototest.AssertDeepEqual(suite.T(), reqs[0].ID, workload.Id) + + suite.T().Cleanup(func() { + // future calls to reconcile would normally have done this as the resource was + // removed. In the case of reconcile being called manually, when the resources + // are automatically removed, the tracking will be stale. In most tests this step + // to remove the tracking should be unnecessary as they will not be reusing a + // mapper between subtests and so it will get "removed" as the mapper is gc'ed. + suite.mapper.UntrackWorkload(workload.Id) + }) + + return suite.checkWorkloadStatus(workload.Id, status) +} + +// testReconcileWithoutNode will inject a workload associated and then a health status +// owned by the workload with the given workload health. Once all the resource injection +// has been performed this will invoke the Reconcile method once on the reconciler +// and check that the computed status matches the expected value +// +// This is really just a tirmmed down version of testReconcileWithNode. It seemed +// simpler and easier to read if these were two separate methods instead of combining +// them in one with more branching based off of detecting whether nodes are in use. +func (suite *workloadHealthControllerTestSuite) testReconcileWithoutNode(workloadHealth pbcatalog.Health, status *pbresource.Condition) *pbresource.Resource { + suite.T().Helper() + workload := resourcetest.Resource(types.WorkloadType, "test-workload"). + WithData(suite.T(), workloadData("")). + Write(suite.T(), suite.client) + + resourcetest.Resource(types.HealthStatusType, "test-status"). + WithData(suite.T(), &pbcatalog.HealthStatus{Type: "tcp", Status: workloadHealth}). + WithOwner(workload.Id). + Write(suite.T(), suite.client) + + err := suite.reconciler.Reconcile(context.Background(), suite.runtime, controller.Request{ + ID: workload.Id, + }) + + require.NoError(suite.T(), err) + + // Read the resource back so we can detect the status changes + return suite.checkWorkloadStatus(workload.Id, status) +} + +// checkWorkloadStatus will read the workload resource and verify that its +// status has the expected value. +func (suite *workloadHealthControllerTestSuite) checkWorkloadStatus(id *pbresource.ID, status *pbresource.Condition) *pbresource.Resource { + suite.T().Helper() + + rsp, err := suite.client.Read(context.Background(), &pbresource.ReadRequest{ + Id: id, + }) + + require.NoError(suite.T(), err) + + actualStatus, found := rsp.Resource.Status[StatusKey] + require.True(suite.T(), found) + require.Equal(suite.T(), rsp.Resource.Generation, actualStatus.ObservedGeneration) + require.Len(suite.T(), actualStatus.Conditions, 1) + prototest.AssertDeepEqual(suite.T(), status, actualStatus.Conditions[0]) + + return rsp.Resource +} + +func (suite *workloadHealthControllerTestSuite) TestReconcile() { + // This test intends to ensure all the permutations of node health and workload + // health end up with the correct computed status. When a test case omits + // the workload health (or sets it to pbcatalog.Health_HEALTH_ANY) then the + // workloads are nodeless and therefore node health will not be considered. + // Additionally the messages put in the status for nodeless workloads are + // a little different to not mention nodes and provide the user more context + // about where the failing health checks are. + + type testCase struct { + nodeHealth pbcatalog.Health + workloadHealth pbcatalog.Health + expectedStatus *pbresource.Condition + } + + cases := map[string]testCase{ + "workload-passing": { + workloadHealth: pbcatalog.Health_HEALTH_PASSING, + expectedStatus: &pbresource.Condition{ + Type: StatusConditionHealthy, + State: pbresource.Condition_STATE_TRUE, + Reason: "HEALTH_PASSING", + Message: WorkloadHealthyMessage, + }, + }, + "workload-warning": { + workloadHealth: pbcatalog.Health_HEALTH_WARNING, + expectedStatus: &pbresource.Condition{ + Type: StatusConditionHealthy, + State: pbresource.Condition_STATE_FALSE, + Reason: "HEALTH_WARNING", + Message: WorkloadUnhealthyMessage, + }, + }, + "workload-critical": { + workloadHealth: pbcatalog.Health_HEALTH_CRITICAL, + expectedStatus: &pbresource.Condition{ + Type: StatusConditionHealthy, + State: pbresource.Condition_STATE_FALSE, + Reason: "HEALTH_CRITICAL", + Message: WorkloadUnhealthyMessage, + }, + }, + "workload-maintenance": { + workloadHealth: pbcatalog.Health_HEALTH_MAINTENANCE, + expectedStatus: &pbresource.Condition{ + Type: StatusConditionHealthy, + State: pbresource.Condition_STATE_FALSE, + Reason: "HEALTH_MAINTENANCE", + Message: WorkloadUnhealthyMessage, + }, + }, + "combined-passing": { + nodeHealth: pbcatalog.Health_HEALTH_PASSING, + workloadHealth: pbcatalog.Health_HEALTH_PASSING, + expectedStatus: &pbresource.Condition{ + Type: StatusConditionHealthy, + State: pbresource.Condition_STATE_TRUE, + Reason: "HEALTH_PASSING", + Message: NodeAndWorkloadHealthyMessage, + }, + }, + "combined-warning-node": { + nodeHealth: pbcatalog.Health_HEALTH_WARNING, + workloadHealth: pbcatalog.Health_HEALTH_PASSING, + expectedStatus: &pbresource.Condition{ + Type: StatusConditionHealthy, + State: pbresource.Condition_STATE_FALSE, + Reason: "HEALTH_WARNING", + Message: nodehealth.NodeUnhealthyMessage, + }, + }, + "combined-warning-workload": { + nodeHealth: pbcatalog.Health_HEALTH_PASSING, + workloadHealth: pbcatalog.Health_HEALTH_WARNING, + expectedStatus: &pbresource.Condition{ + Type: StatusConditionHealthy, + State: pbresource.Condition_STATE_FALSE, + Reason: "HEALTH_WARNING", + Message: WorkloadUnhealthyMessage, + }, + }, + "combined-critical-node": { + nodeHealth: pbcatalog.Health_HEALTH_CRITICAL, + workloadHealth: pbcatalog.Health_HEALTH_WARNING, + expectedStatus: &pbresource.Condition{ + Type: StatusConditionHealthy, + State: pbresource.Condition_STATE_FALSE, + Reason: "HEALTH_CRITICAL", + Message: NodeAndWorkloadUnhealthyMessage, + }, + }, + "combined-critical-workload": { + nodeHealth: pbcatalog.Health_HEALTH_WARNING, + workloadHealth: pbcatalog.Health_HEALTH_CRITICAL, + expectedStatus: &pbresource.Condition{ + Type: StatusConditionHealthy, + State: pbresource.Condition_STATE_FALSE, + Reason: "HEALTH_CRITICAL", + Message: NodeAndWorkloadUnhealthyMessage, + }, + }, + "combined-maintenance-node": { + nodeHealth: pbcatalog.Health_HEALTH_MAINTENANCE, + workloadHealth: pbcatalog.Health_HEALTH_CRITICAL, + expectedStatus: &pbresource.Condition{ + Type: StatusConditionHealthy, + State: pbresource.Condition_STATE_FALSE, + Reason: "HEALTH_MAINTENANCE", + Message: NodeAndWorkloadUnhealthyMessage, + }, + }, + "combined-maintenance-workload": { + nodeHealth: pbcatalog.Health_HEALTH_CRITICAL, + workloadHealth: pbcatalog.Health_HEALTH_MAINTENANCE, + expectedStatus: &pbresource.Condition{ + Type: StatusConditionHealthy, + State: pbresource.Condition_STATE_FALSE, + Reason: "HEALTH_MAINTENANCE", + Message: NodeAndWorkloadUnhealthyMessage, + }, + }, + } + + for name, tcase := range cases { + suite.Run(name, func() { + if tcase.nodeHealth != pbcatalog.Health_HEALTH_ANY { + suite.testReconcileWithNode(tcase.nodeHealth, tcase.workloadHealth, tcase.expectedStatus) + } else { + suite.testReconcileWithoutNode(tcase.workloadHealth, tcase.expectedStatus) + } + }) + } +} + +func (suite *workloadHealthControllerTestSuite) TestReconcileReadError() { + // This test's goal is to prove that errors other than NotFound from the Resource service + // when reading the workload to reconcile will be propagate back to the Reconcile caller. + // + // Passing a resource with an unknown type isn't particularly realistic as the controller + // manager running our reconciliation will ensure all resource ids used are valid. However + // its a really easy way right not to force the error. + id := resourceID(fakeType, "blah") + + err := suite.reconciler.Reconcile(context.Background(), suite.runtime, controller.Request{ID: id}) + require.Error(suite.T(), err) + require.Equal(suite.T(), codes.InvalidArgument, status.Code(err)) +} + +func (suite *workloadHealthControllerTestSuite) TestReconcileNotFound() { + // This test wants to ensure that tracking for a workload is removed when the workload is deleted + // so this test will inject the tracking, issue the Reconcile call which will get a + // not found error and then ensure that the tracking was removed. + + workload := resourcetest.Resource(types.WorkloadType, "foo"). + WithData(suite.T(), workloadData("test-node")). + // don't write this because then in the call to reconcile the resource + // would be found and defeat the purpose of the tes + Build() + + node := resourcetest.Resource(types.NodeType, "test-node"). + WithData(suite.T(), nodeData). + // Whether this gets written or not doesn't matter + Build() + + // Track the workload - this simulates a previous round of reconciliation + // where the workload existed and was associated to the node. Other tests + // will cover more of the lifecycle of the controller so for the purposes + // of this test we can just inject it ourselves. + suite.mapper.TrackWorkload(workload.Id, node.Id) + + // check that the worklooad is in fact tracked properly + reqs, err := suite.mapper.MapNodeToWorkloads(context.Background(), suite.runtime, node) + + require.NoError(suite.T(), err) + require.Len(suite.T(), reqs, 1) + prototest.AssertDeepEqual(suite.T(), workload.Id, reqs[0].ID) + + // This workload was never actually inserted so the request should return a NotFound + // error and remove the workload from tracking + require.NoError( + suite.T(), + suite.reconciler.Reconcile( + context.Background(), + suite.runtime, + controller.Request{ID: workload.Id})) + + // Check the mapper again to ensure the node:workload association was removed. + reqs, err = suite.mapper.MapNodeToWorkloads(context.Background(), suite.runtime, node) + require.NoError(suite.T(), err) + require.Empty(suite.T(), reqs) +} + +func (suite *workloadHealthControllerTestSuite) TestGetNodeHealthError() { + // This test aims to ensure that errors coming from the getNodeHealth + // function are propagated back to the caller. In order to do so + // we are going to inject a node but not set its status yet. This + // simulates the condition where the workload health controller happened + // to start reconciliation before the node health controller. In that + // case we also expect the errNodeUnreconciled error to be returned + // but the exact error isn't very relevant to the core reason this + // test exists. + + node := resourcetest.Resource(types.NodeType, "test-node"). + WithData(suite.T(), nodeData). + Write(suite.T(), suite.client) + + workload := resourcetest.Resource(types.WorkloadType, "test-workload"). + WithData(suite.T(), workloadData(node.Id.Name)). + Write(suite.T(), suite.client) + + resourcetest.Resource(types.HealthStatusType, "test-status"). + WithData(suite.T(), &pbcatalog.HealthStatus{Type: "tcp", Status: pbcatalog.Health_HEALTH_CRITICAL}). + WithOwner(workload.Id). + Write(suite.T(), suite.client) + + err := suite.reconciler.Reconcile(context.Background(), suite.runtime, controller.Request{ + ID: workload.Id, + }) + + require.Error(suite.T(), err) + require.Equal(suite.T(), errNodeUnreconciled, err) +} + +func (suite *workloadHealthControllerTestSuite) TestReconcile_AvoidReconciliationWrite() { + // The sole purpose of this test is to ensure that calls to Reconcile for an already + // reconciled workload will not perform extra/unnecessary status writes. Basically + // we check that calling Reconcile twice in a row without any actual health change + // doesn't bump the Version (which would increased for any write of the resource + // or its status) + status := &pbresource.Condition{ + Type: StatusConditionHealthy, + State: pbresource.Condition_STATE_FALSE, + Reason: "HEALTH_WARNING", + Message: WorkloadUnhealthyMessage, + } + res1 := suite.testReconcileWithoutNode(pbcatalog.Health_HEALTH_WARNING, status) + + err := suite.reconciler.Reconcile(context.Background(), suite.runtime, controller.Request{ID: res1.Id}) + require.NoError(suite.T(), err) + + // check that the status hasn't changed + res2 := suite.checkWorkloadStatus(res1.Id, status) + + // If another status write was performed then the versions would differ. This + // therefore proves that after a second reconciliation without any change + // in status that the controller is not making extra status writes. + require.Equal(suite.T(), res1.Version, res2.Version) +} + +func (suite *workloadHealthControllerTestSuite) TestController() { + // This test aims to be a very light weight integration test of the + // controller with the controller manager as well as a general + // controller lifecycle test. + + // create the controller manager + mgr := controller.NewManager(suite.client, testutil.Logger(suite.T())) + + // register our controller + mgr.Register(WorkloadHealthController(suite.mapper)) + mgr.SetRaftLeader(true) + ctx, cancel := context.WithCancel(context.Background()) + suite.T().Cleanup(cancel) + + // run the manager + go mgr.Run(ctx) + + // create a node to link things with + node := suite.injectNodeWithStatus("test-node", pbcatalog.Health_HEALTH_PASSING) + + // create the workload + workload := resourcetest.Resource(types.WorkloadType, "test-workload"). + WithData(suite.T(), workloadData(node.Id.Name)). + Write(suite.T(), suite.client) + + // Wait for reconciliation to occur and mark the workload as passing. + suite.waitForReconciliation(workload.Id, "HEALTH_PASSING") + + // Simulate a node unhealty + suite.injectNodeWithStatus("test-node", pbcatalog.Health_HEALTH_WARNING) + + // Wait for reconciliation to occur and mark the workload as warning + // due to the node going into the warning state. + suite.waitForReconciliation(workload.Id, "HEALTH_WARNING") + + // Now register a critical health check that should supercede the nodes + // warning status + + resourcetest.Resource(types.HealthStatusType, "test-status"). + WithData(suite.T(), &pbcatalog.HealthStatus{Type: "tcp", Status: pbcatalog.Health_HEALTH_CRITICAL}). + WithOwner(workload.Id). + Write(suite.T(), suite.client) + + // Wait for reconciliation to occur again and mark the workload as unhealthy + suite.waitForReconciliation(workload.Id, "HEALTH_CRITICAL") + + // Put the health status back into a passing state and delink the node + resourcetest.Resource(types.HealthStatusType, "test-status"). + WithData(suite.T(), &pbcatalog.HealthStatus{Type: "tcp", Status: pbcatalog.Health_HEALTH_PASSING}). + WithOwner(workload.Id). + Write(suite.T(), suite.client) + workload = resourcetest.Resource(types.WorkloadType, "test-workload"). + WithData(suite.T(), workloadData("")). + Write(suite.T(), suite.client) + + // Now that the workload health is passing and its not associated with the node its status should + // eventually become passing + suite.waitForReconciliation(workload.Id, "HEALTH_PASSING") +} + +// wait for reconciliation is a helper to check if a resource has been reconciled and +// is marked with the expected status. +func (suite *workloadHealthControllerTestSuite) waitForReconciliation(id *pbresource.ID, reason string) { + suite.T().Helper() + + retry.Run(suite.T(), func(r *retry.R) { + rsp, err := suite.client.Read(context.Background(), &pbresource.ReadRequest{ + Id: id, + }) + require.NoError(r, err) + + status, found := rsp.Resource.Status[StatusKey] + require.True(r, found) + require.Equal(r, rsp.Resource.Generation, status.ObservedGeneration) + require.Len(r, status.Conditions, 1) + require.Equal(r, reason, status.Conditions[0].Reason) + }) +} + +func TestWorkloadHealthController(t *testing.T) { + suite.Run(t, new(workloadHealthControllerTestSuite)) +} + +type getWorkloadHealthTestSuite struct { + controllerSuite +} + +func (suite *getWorkloadHealthTestSuite) addHealthStatuses(workload *pbresource.ID, desiredHealth pbcatalog.Health) { + // In order to exercise the behavior to ensure that the ordering a health status is + // seen doesn't matter this is strategically naming health status so that they will be + // returned in an order with the most precedent status being in the middle of the list. + // This will ensure that statuses seen later can override a previous status that that + // status seen later do not override if they would lower the overall status such as + // going from critical -> warning. + healthStatuses := []pbcatalog.Health{ + pbcatalog.Health_HEALTH_PASSING, + pbcatalog.Health_HEALTH_WARNING, + pbcatalog.Health_HEALTH_CRITICAL, + pbcatalog.Health_HEALTH_MAINTENANCE, + pbcatalog.Health_HEALTH_CRITICAL, + pbcatalog.Health_HEALTH_WARNING, + pbcatalog.Health_HEALTH_PASSING, + } + + for idx, health := range healthStatuses { + if desiredHealth >= health { + resourcetest.Resource(types.HealthStatusType, fmt.Sprintf("check-%s-%d", workload.Name, idx)). + WithData(suite.T(), &pbcatalog.HealthStatus{Type: "tcp", Status: health}). + WithOwner(workload). + Write(suite.T(), suite.client) + } + } +} + +func (suite *getWorkloadHealthTestSuite) TestListError() { + // This test's goal is to exercise the error propgataion behavior within + // getWorkloadHealth. When the resource listing fails, we want to + // propagate the error which should eventually result in retrying + // the operation. + health, err := getWorkloadHealth(context.Background(), suite.runtime, resourceID(fakeType, "foo")) + + require.Error(suite.T(), err) + require.Equal(suite.T(), codes.InvalidArgument, status.Code(err)) + require.Equal(suite.T(), pbcatalog.Health_HEALTH_CRITICAL, health) +} + +func (suite *getWorkloadHealthTestSuite) TestNoHealthStatuses() { + // This test's goal is to ensure that when no HealthStatuses are owned by the + // workload that the health is assumed to be passing. + workload := resourcetest.Resource(types.WorkloadType, "foo"). + WithData(suite.T(), workloadData("")). + Write(suite.T(), suite.client) + + health, err := getWorkloadHealth(context.Background(), suite.runtime, workload.Id) + require.NoError(suite.T(), err) + require.Equal(suite.T(), pbcatalog.Health_HEALTH_PASSING, health) +} + +func (suite *getWorkloadHealthTestSuite) TestWithStatuses() { + // This test's goal is to ensure that the health calculation given multiple + // statuses results in the most precedent winning. The addHealthStatuses + // helper method is used to inject multiple statuses in a way such that + // the resource service will return them in a predictable order and can + // properly exercise the code. + for value, status := range pbcatalog.Health_name { + health := pbcatalog.Health(value) + if health == pbcatalog.Health_HEALTH_ANY { + continue + } + + suite.Run(status, func() { + workload := resourcetest.Resource(types.WorkloadType, "foo"). + WithData(suite.T(), workloadData("")). + Write(suite.T(), suite.client) + + suite.addHealthStatuses(workload.Id, health) + + actualHealth, err := getWorkloadHealth(context.Background(), suite.runtime, workload.Id) + require.NoError(suite.T(), err) + require.Equal(suite.T(), health, actualHealth) + }) + } +} + +func TestGetWorkloadHealth(t *testing.T) { + suite.Run(t, new(getWorkloadHealthTestSuite)) +} + +type getNodeHealthTestSuite struct { + controllerSuite +} + +func (suite *getNodeHealthTestSuite) TestNotfound() { + // This test's goal is to ensure that getNodeHealth when called with a node id that isn't + // present in the system results in a the critical health but no error. This situation + // could occur when a linked node gets removed without the workloads being modified/removed. + // When that occurs we want to steer traffic away from the linked node as soon as possible. + health, err := getNodeHealth(context.Background(), suite.runtime, resourceID(types.NodeType, "not-found")) + require.NoError(suite.T(), err) + require.Equal(suite.T(), pbcatalog.Health_HEALTH_CRITICAL, health) + +} + +func (suite *getNodeHealthTestSuite) TestReadError() { + // This test's goal is to ensure the getNodeHealth propagates unexpected errors from + // its resource read call back to the caller. + health, err := getNodeHealth(context.Background(), suite.runtime, resourceID(fakeType, "not-found")) + require.Error(suite.T(), err) + require.Equal(suite.T(), codes.InvalidArgument, status.Code(err)) + require.Equal(suite.T(), pbcatalog.Health_HEALTH_CRITICAL, health) +} + +func (suite *getNodeHealthTestSuite) TestUnreconciled() { + // This test's goal is to ensure that nodes with unreconciled health are deemed + // critical. Basically, the workload health controller should defer calculating + // the workload health until the associated nodes health is known. + node := resourcetest.Resource(types.NodeType, "unreconciled"). + WithData(suite.T(), nodeData). + Write(suite.T(), suite.client). + GetId() + + health, err := getNodeHealth(context.Background(), suite.runtime, node) + require.Error(suite.T(), err) + require.Equal(suite.T(), errNodeUnreconciled, err) + require.Equal(suite.T(), pbcatalog.Health_HEALTH_CRITICAL, health) +} + +func (suite *getNodeHealthTestSuite) TestNoConditions() { + // This test's goal is to ensure that if a node's health status doesn't have + // the expected condition then its deemedd critical. This should never happen + // in the integrated system as the node health controller would have to be + // buggy to add an empty status. However it could also indicate some breaking + // change went in. Regardless, the code to handle this state is written + // and it will be tested here. + node := resourcetest.Resource(types.NodeType, "no-conditions"). + WithData(suite.T(), nodeData). + WithStatus(nodehealth.StatusKey, &pbresource.Status{}). + Write(suite.T(), suite.client). + GetId() + + health, err := getNodeHealth(context.Background(), suite.runtime, node) + require.Error(suite.T(), err) + require.Equal(suite.T(), errNodeHealthConditionNotFound, err) + require.Equal(suite.T(), pbcatalog.Health_HEALTH_CRITICAL, health) +} + +func (suite *getNodeHealthTestSuite) TestInvalidReason() { + // This test has the same goal as TestNoConditions which is to ensure that if + // the node health status isn't properly formed then we assume it is unhealthy. + // Just like that other test, it should be impossible for the normal running + // system to actually get into this state or at least for the node-health + // controller to put it into this state. As users or other controllers could + // potentially force it into this state by writing the status themselves, it + // would be good to ensure the defined behavior works as expected. + node := resourcetest.Resource(types.NodeType, "invalid-reason"). + WithData(suite.T(), nodeData). + WithStatus(nodehealth.StatusKey, &pbresource.Status{ + Conditions: []*pbresource.Condition{ + { + Type: nodehealth.StatusConditionHealthy, + State: pbresource.Condition_STATE_FALSE, + Reason: "INVALID_REASON", + }, + }, + }). + Write(suite.T(), suite.client). + GetId() + + health, err := getNodeHealth(context.Background(), suite.runtime, node) + require.Error(suite.T(), err) + require.Equal(suite.T(), errNodeHealthInvalid, err) + require.Equal(suite.T(), pbcatalog.Health_HEALTH_CRITICAL, health) +} + +func (suite *getNodeHealthTestSuite) TestValidHealth() { + // This test aims to ensure that all status that would be reported by the node-health + // controller gets accurately detected and returned by the getNodeHealth function. + for value, healthStr := range pbcatalog.Health_name { + health := pbcatalog.Health(value) + + // this is not a valid health that a health status + // may be in. + if health == pbcatalog.Health_HEALTH_ANY { + continue + } + + suite.T().Run(healthStr, func(t *testing.T) { + node := suite.injectNodeWithStatus("test-node", health) + + actualHealth, err := getNodeHealth(context.Background(), suite.runtime, node.Id) + require.NoError(t, err) + require.Equal(t, health, actualHealth) + }) + } +} + +func TestGetNodeHealth(t *testing.T) { + suite.Run(t, new(getNodeHealthTestSuite)) +} diff --git a/internal/catalog/internal/controllers/workloadhealth/status.go b/internal/catalog/internal/controllers/workloadhealth/status.go new file mode 100644 index 000000000..a7da3fad6 --- /dev/null +++ b/internal/catalog/internal/controllers/workloadhealth/status.go @@ -0,0 +1,11 @@ +package workloadhealth + +const ( + StatusKey = "consul.io/workload-health" + StatusConditionHealthy = "healthy" + + NodeAndWorkloadHealthyMessage = "All workload and associated node health checks are passing" + WorkloadHealthyMessage = "All workload health checks are passing" + NodeAndWorkloadUnhealthyMessage = "One or more workload and node health checks are not passing" + WorkloadUnhealthyMessage = "One or more workload health checks are not passing" +) diff --git a/internal/catalog/internal/mappers/nodemapper/node_mapper.go b/internal/catalog/internal/mappers/nodemapper/node_mapper.go new file mode 100644 index 000000000..8eea26dd0 --- /dev/null +++ b/internal/catalog/internal/mappers/nodemapper/node_mapper.go @@ -0,0 +1,104 @@ +package nodemapper + +import ( + "context" + "sync" + + "github.com/hashicorp/consul/internal/catalog/internal/types" + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/resource" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +type NodeMapper struct { + lock sync.Mutex + nodesToWorkloads map[string][]controller.Request + workloadsToNodes map[string]string +} + +func New() *NodeMapper { + return &NodeMapper{ + workloadsToNodes: make(map[string]string), + nodesToWorkloads: make(map[string][]controller.Request), + } +} + +// NodeIDFromWorkload will create a resource ID referencing the Node type with the same tenancy as +// the workload and with the name populated from the workloads NodeName field. +func (m *NodeMapper) NodeIDFromWorkload(workload *pbresource.Resource, workloadData *pbcatalog.Workload) *pbresource.ID { + return &pbresource.ID{ + Type: types.NodeType, + Tenancy: workload.Id.Tenancy, + Name: workloadData.NodeName, + } +} + +// MapNodeToWorkloads will take a Node resource and return controller requests +// for all Workloads associated with the Node. +func (m *NodeMapper) MapNodeToWorkloads(_ context.Context, _ controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) { + m.lock.Lock() + defer m.lock.Unlock() + return m.nodesToWorkloads[res.Id.Name], nil +} + +// TrackWorkload instructs the NodeMapper to associate the given workload +// ID with the given node ID. +func (m *NodeMapper) TrackWorkload(workloadID *pbresource.ID, nodeID *pbresource.ID) { + m.lock.Lock() + defer m.lock.Unlock() + + if previousNode, found := m.workloadsToNodes[workloadID.Name]; found && previousNode == nodeID.Name { + return + } else if found { + // the node association is being changed + m.untrackWorkloadFromNode(workloadID, previousNode) + } + + // Now set up the latest tracking + m.nodesToWorkloads[nodeID.Name] = append(m.nodesToWorkloads[nodeID.Name], controller.Request{ID: workloadID}) + m.workloadsToNodes[workloadID.Name] = nodeID.Name +} + +// UntrackWorkload will cause the node mapper to forget about the specified +// workload if it is currently tracking it. +func (m *NodeMapper) UntrackWorkload(workloadID *pbresource.ID) { + m.lock.Lock() + defer m.lock.Unlock() + + node, found := m.workloadsToNodes[workloadID.Name] + if !found { + return + } + m.untrackWorkloadFromNode(workloadID, node) +} + +// untrackWorkloadFromNode will disassociate the specified workload and node. +// This method will clean up unnecessary tracking entries if the node name +// is no longer associated with any workloads. +func (m *NodeMapper) untrackWorkloadFromNode(workloadID *pbresource.ID, node string) { + foundIdx := -1 + for idx, req := range m.nodesToWorkloads[node] { + if resource.EqualID(req.ID, workloadID) { + foundIdx = idx + break + } + } + + if foundIdx != -1 { + workloads := m.nodesToWorkloads[node] + l := len(workloads) + + if l == 1 { + delete(m.nodesToWorkloads, node) + } else if foundIdx == l-1 { + m.nodesToWorkloads[node] = workloads[:foundIdx] + } else if foundIdx == 0 { + m.nodesToWorkloads[node] = workloads[1:] + } else { + m.nodesToWorkloads[node] = append(workloads[:foundIdx], workloads[foundIdx+1:]...) + } + } + + delete(m.workloadsToNodes, workloadID.Name) +} diff --git a/internal/catalog/internal/mappers/nodemapper/node_mapper_test.go b/internal/catalog/internal/mappers/nodemapper/node_mapper_test.go new file mode 100644 index 000000000..acfe67cf6 --- /dev/null +++ b/internal/catalog/internal/mappers/nodemapper/node_mapper_test.go @@ -0,0 +1,146 @@ +package nodemapper + +import ( + "context" + "testing" + + "github.com/hashicorp/consul/internal/catalog/internal/types" + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/resource/resourcetest" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/proto/private/prototest" + "github.com/stretchr/testify/require" +) + +func TestNodeMapper_NodeIDFromWorkload(t *testing.T) { + mapper := New() + + data := &pbcatalog.Workload{ + NodeName: "test-node", + // the other fields should be irrelevant + } + + workload := resourcetest.Resource(types.WorkloadType, "test-workload"). + WithData(t, data).Build() + + actual := mapper.NodeIDFromWorkload(workload, data) + expected := &pbresource.ID{ + Type: types.NodeType, + Tenancy: workload.Id.Tenancy, + Name: "test-node", + } + + prototest.AssertDeepEqual(t, expected, actual) +} + +func requireWorkloadsTracked(t *testing.T, mapper *NodeMapper, node *pbresource.Resource, workloads ...*pbresource.ID) { + t.Helper() + reqs, err := mapper.MapNodeToWorkloads( + context.Background(), + controller.Runtime{}, + node) + + require.NoError(t, err) + require.Len(t, reqs, len(workloads)) + for _, workload := range workloads { + prototest.AssertContainsElement(t, reqs, controller.Request{ID: workload}) + } +} + +func TestNodeMapper_WorkloadTracking(t *testing.T) { + mapper := New() + + node1 := resourcetest.Resource(types.NodeType, "node1"). + WithData(t, &pbcatalog.Node{Addresses: []*pbcatalog.NodeAddress{{Host: "198.18.0.1"}}}). + Build() + + node2 := resourcetest.Resource(types.NodeType, "node2"). + WithData(t, &pbcatalog.Node{Addresses: []*pbcatalog.NodeAddress{{Host: "198.18.0.2"}}}). + Build() + + tenant := &pbresource.Tenancy{ + Partition: "default", + Namespace: "default", + PeerName: "local", + } + + workload1 := &pbresource.ID{Type: types.WorkloadType, Tenancy: tenant, Name: "workload1"} + workload2 := &pbresource.ID{Type: types.WorkloadType, Tenancy: tenant, Name: "workload2"} + workload3 := &pbresource.ID{Type: types.WorkloadType, Tenancy: tenant, Name: "workload3"} + workload4 := &pbresource.ID{Type: types.WorkloadType, Tenancy: tenant, Name: "workload4"} + workload5 := &pbresource.ID{Type: types.WorkloadType, Tenancy: tenant, Name: "workload5"} + + // No Workloads have been tracked so the mapper should return empty lists + requireWorkloadsTracked(t, mapper, node1) + requireWorkloadsTracked(t, mapper, node2) + // As nothing is tracked these should be pretty much no-ops + mapper.UntrackWorkload(workload1) + mapper.UntrackWorkload(workload2) + mapper.UntrackWorkload(workload2) + mapper.UntrackWorkload(workload3) + mapper.UntrackWorkload(workload4) + mapper.UntrackWorkload(workload5) + + // Now track some workloads + mapper.TrackWorkload(workload1, node1.Id) + mapper.TrackWorkload(workload2, node1.Id) + mapper.TrackWorkload(workload3, node2.Id) + mapper.TrackWorkload(workload4, node2.Id) + + // Mapping should now return 2 workload requests for each node + requireWorkloadsTracked(t, mapper, node1, workload1, workload2) + requireWorkloadsTracked(t, mapper, node2, workload3, workload4) + + // Track the same workloads again, this should end up being mostly a no-op + mapper.TrackWorkload(workload1, node1.Id) + mapper.TrackWorkload(workload2, node1.Id) + mapper.TrackWorkload(workload3, node2.Id) + mapper.TrackWorkload(workload4, node2.Id) + + // Mappings should be unchanged from the initial workload tracking + requireWorkloadsTracked(t, mapper, node1, workload1, workload2) + requireWorkloadsTracked(t, mapper, node2, workload3, workload4) + + // Change the workload association for workload2 + mapper.TrackWorkload(workload2, node2.Id) + + // Node1 should now track just the single workload and node2 should track 3 + requireWorkloadsTracked(t, mapper, node1, workload1) + requireWorkloadsTracked(t, mapper, node2, workload2, workload3, workload4) + + // Untrack the workloads - this is done in very specific ordering to ensure all + // the workload tracking removal paths get hit. This does assume that the ordering + // of requests is stable between removals. + + // remove the one and only workload from a node + mapper.UntrackWorkload(workload1) + requireWorkloadsTracked(t, mapper, node1) + + // track an additional workload + mapper.TrackWorkload(workload5, node2.Id) + reqs, err := mapper.MapNodeToWorkloads(context.Background(), controller.Runtime{}, node2) + require.NoError(t, err) + require.Len(t, reqs, 4) + + first := reqs[0].ID + second := reqs[1].ID + third := reqs[2].ID + fourth := reqs[3].ID + + // remove from the middle of the request list + mapper.UntrackWorkload(second) + requireWorkloadsTracked(t, mapper, node2, first, third, fourth) + + // remove from the end of the list + mapper.UntrackWorkload(fourth) + requireWorkloadsTracked(t, mapper, node2, first, third) + + // remove from the beginning of the list + mapper.UntrackWorkload(first) + requireWorkloadsTracked(t, mapper, node2, third) + + // remove the last element + mapper.UntrackWorkload(third) + requireWorkloadsTracked(t, mapper, node2) +}