package validate import ( "fmt" "strings" envoy_admin_v3 "github.com/envoyproxy/go-control-plane/envoy/admin/v3" envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" envoy_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" envoy_aggregate_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/clusters/aggregate/v3" envoy_resource_v3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/hashicorp/consul/api" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "github.com/hashicorp/consul/envoyextensions/extensioncommon" ) const builtinValidateExtension = "builtin/proxy/validate" // Validate contains input information about which proxy resources to validate and output information about resources it // has validated. type Validate struct { // envoyID is an argument to the Validate plugin and identifies which listener to begin the validation with. envoyID string // vip is an argument to the Validate plugin and identifies which transparent proxy listener to begin the validation // with. vip string // snis is all of the upstream SNIs for this proxy. It is set via ExtensionConfiguration. snis map[string]struct{} // listener specifies if the service's listener has been seen. listener bool // usesRDS determines if the listener's outgoing filter uses RDS. usesRDS bool // listener specifies if the service's route has been seen. route bool // resources is a mapping from SNI to the expected resources // for that SNI. It is populated based on the cluster names on routes // (whether they are specified on listener filters or routes). resources map[string]*resource } type resource struct { // required determines if the resource is required for the given upstream. required bool // cluster specifies if the cluster has been seen. cluster bool // aggregateCluster determines if the resource is an aggregate cluster. aggregateCluster bool // aggregateClusterChildren is a list of SNIs to identify the child clusters of this aggregate cluster. aggregateClusterChildren []string // parentCluster is empty if this is a top level cluster, and has a value if this is a child of an aggregate // cluster. parentCluster string // loadAssignment specifies if the load assignment has been seen. loadAssignment bool // usesEDS specifies if the cluster has EDS configured. usesEDS bool // The number of endpoints for the cluster or load assignment. endpoints int } var _ extensioncommon.BasicExtension = (*Validate)(nil) // EndpointValidator allows us to inject a different function for tests. type EndpointValidator func(*resource, string, *envoy_admin_v3.Clusters) // MakeValidate is a builtinextensiontemplate.PluginConstructor for a builtinextensiontemplate.EnvoyExtension. func MakeValidate(ext extensioncommon.RuntimeConfig) (extensioncommon.BasicExtension, error) { var resultErr error var plugin Validate if name := ext.EnvoyExtension.Name; name != builtinValidateExtension { return nil, fmt.Errorf("expected extension name 'builtin/proxy/validate' but got %q", name) } envoyID, _ := ext.EnvoyExtension.Arguments["envoyID"] mainEnvoyID, _ := envoyID.(string) vip := "" snis := map[string]struct{}{} upstream, ok := ext.Upstreams[ext.ServiceName] if ok { vip = upstream.VIP if upstream.SNI == nil || len(upstream.SNI) == 0 { return nil, fmt.Errorf("no SNIs were set, unable to validate Envoy clusters") } snis = upstream.SNI } if mainEnvoyID == "" && vip == "" { return nil, fmt.Errorf("envoyID or virtual IP is required") } plugin.vip = vip plugin.envoyID = mainEnvoyID plugin.snis = snis plugin.resources = make(map[string]*resource) return &plugin, resultErr } type Messages []Message type Message struct { Success bool Message string PossibleActions string } func (m Messages) Success() bool { for _, message := range m { if !message.Success { return false } } return true } func (m Messages) Errors() Messages { var errors Messages for _, message := range m { if !message.Success { errors = append(errors, message) } } return errors } // GetMessages returns the error based only on Validate's state. func (v *Validate) GetMessages(validateEndpoints bool, endpointValidator EndpointValidator, clusters *envoy_admin_v3.Clusters) Messages { var messages Messages var upstream string upstream = v.envoyID if v.envoyID == "" { upstream = v.vip } if !v.listener { messages = append(messages, Message{Message: fmt.Sprintf("no listener for upstream %q", upstream)}) } else { messages = append(messages, Message{ Message: fmt.Sprintf("listener for upstream %q found", upstream), Success: true, }) } if v.usesRDS && !v.route { messages = append(messages, Message{Message: fmt.Sprintf("no route for upstream %q", upstream)}) } else { messages = append(messages, Message{ Message: fmt.Sprintf("route for upstream %q found", upstream), Success: true, }) } numRequiredResources := 0 // Resources will be marked as required in PatchFilter or PatchRoute because the listener or route will determine // which clusters/endpoints to validate. for sni, resource := range v.resources { if !resource.required { continue } numRequiredResources += 1 _, ok := v.snis[sni] if !ok || !resource.cluster { messages = append(messages, Message{Message: fmt.Sprintf("no cluster %q for upstream %q", sni, upstream)}) continue } else { messages = append(messages, Message{ Message: fmt.Sprintf("cluster %q for upstream %q found", sni, upstream), Success: true, }) } // If the resource is a passthrough cluster, it will not have endpoints, so we need to skip the endpoint // validation. if strings.Contains(sni, "passthrough~") { messages = append(messages, Message{ Message: fmt.Sprintf("cluster %q is a passthrough cluster, skipping endpoint healthiness check", sni), Success: true, }) continue } if validateEndpoints { // If resource is a top-level cluster (any cluster that is an aggregate cluster or not a child of an aggregate // cluster), it will have an empty parent. If resource is a child cluster, it will have a nonempty parent. if resource.parentCluster == "" && resource.aggregateCluster { // Aggregate cluster case: do endpoint verification by checking each child cluster. We need at least one // child cluster to have healthy endpoints. oneClusterHasEndpoints := false for _, childCluster := range resource.aggregateClusterChildren { endpointValidator(v.resources[childCluster], childCluster, clusters) if v.resources[childCluster].endpoints > 0 { oneClusterHasEndpoints = true } } if !oneClusterHasEndpoints { messages = append(messages, Message{Message: fmt.Sprintf("no healthy endpoints for aggregate cluster %q for upstream %q", sni, upstream)}) } else { messages = append(messages, Message{ Message: fmt.Sprintf("healthy endpoints for aggregate cluster %q for upstream %q", sni, upstream), Success: true, }) } } else if resource.parentCluster == "" { // Top-level non-aggregate cluster case: check for load assignment and healthy endpoints. endpointValidator(resource, sni, clusters) if (resource.usesEDS && !resource.loadAssignment) || resource.endpoints == 0 { messages = append(messages, Message{ Message: fmt.Sprintf("no healthy endpoints for cluster %q for upstream %q", sni, upstream), }) } else { messages = append(messages, Message{ Message: fmt.Sprintf("healthy endpoints for cluster %q for upstream %q", sni, upstream), Success: true, }) } } else { // Child cluster case: skip, since it'll be verified by the parent aggregate cluster. continue } } } if numRequiredResources == 0 { messages = append(messages, Message{Message: fmt.Sprintf("no clusters found on route or listener")}) } return messages } // DoEndpointValidation implements the EndpointVerifier function type. func DoEndpointValidation(r *resource, sni string, clusters *envoy_admin_v3.Clusters) { clusterStatuses := clusters.GetClusterStatuses() if clusterStatuses == nil { return } status := &envoy_admin_v3.ClusterStatus{} r.loadAssignment = false for _, s := range clusterStatuses { if s.Name == sni { status = s r.loadAssignment = true break } } healthyEndpoints := 0 hostStatuses := status.GetHostStatuses() if r.loadAssignment && hostStatuses != nil { for _, h := range hostStatuses { health := h.GetHealthStatus() if health != nil { if health.EdsHealthStatus == envoy_core_v3.HealthStatus_HEALTHY && health.FailedOutlierCheck == false { healthyEndpoints += 1 } } } } r.endpoints = healthyEndpoints } // CanApply determines if the extension can apply to the given extension configuration. func (p *Validate) CanApply(config *extensioncommon.RuntimeConfig) bool { return true } func (p *Validate) PatchRoute(config *extensioncommon.RuntimeConfig, route *envoy_route_v3.RouteConfiguration) (*envoy_route_v3.RouteConfiguration, bool, error) { // Route name on connect proxies will be the envoy ID. We are only validating routes for the specific upstream with // the envoyID configured. if route.Name != p.envoyID { return route, false, nil } p.route = true for sni := range extensioncommon.RouteClusterNames(route) { if _, ok := p.resources[sni]; ok { continue } p.resources[sni] = &resource{required: true} } return route, false, nil } func (p *Validate) PatchCluster(config *extensioncommon.RuntimeConfig, c *envoy_cluster_v3.Cluster) (*envoy_cluster_v3.Cluster, bool, error) { v, ok := p.resources[c.Name] if !ok { v = &resource{} p.resources[c.Name] = v } v.cluster = true // If it's an aggregate cluster, add the child clusters to p.resources if they are not already there. aggregateCluster, ok := isAggregateCluster(c) if ok { // Mark this as an aggregate cluster, so we know we do not need to validate its endpoints directly. v.aggregateCluster = true for _, clusterName := range aggregateCluster.Clusters { r, ok := p.resources[clusterName] if !ok { r = &resource{} p.resources[clusterName] = r } if v.aggregateClusterChildren == nil { v.aggregateClusterChildren = []string{} } // On the parent cluster, add the children. v.aggregateClusterChildren = append(v.aggregateClusterChildren, clusterName) // On the child cluster, set the parent. r.parentCluster = c.Name // The child clusters of an aggregate cluster will be required if the parent cluster is. r.required = v.required } return c, false, nil } if c.EdsClusterConfig != nil { v.usesEDS = true } else { la := c.LoadAssignment if la == nil { return c, false, nil } v.endpoints = len(la.Endpoints) + len(la.NamedEndpoints) } return c, false, nil } func (p *Validate) PatchFilter(config *extensioncommon.RuntimeConfig, filter *envoy_listener_v3.Filter) (*envoy_listener_v3.Filter, bool, error) { // If a single filter exists for a listener we say it exists. p.listener = true if httpConfig := envoy_resource_v3.GetHTTPConnectionManager(filter); httpConfig != nil { // If the http filter uses RDS, then the clusters we need to validate exist in the route, and there's nothing // else we need to do with the filter. if httpConfig.GetRds() != nil { p.usesRDS = true // Edit the runtime configuration to add an envoy ID based on the route name in the filter. This is because // routes are matched by envoyID and in the transparent proxy case, we only have the VIP set in the // RuntimeConfig. p.envoyID = httpConfig.GetRds().RouteConfigName emptyServiceKey := api.CompoundServiceName{} upstream, ok := config.Upstreams[emptyServiceKey] if ok { upstream.EnvoyID = p.envoyID } return filter, true, nil } } // FilterClusterNames handles the filter being an http or tcp filter. for sni := range extensioncommon.FilterClusterNames(filter) { // Mark any clusters we see as required resources. if r, ok := p.resources[sni]; ok { r.required = true } else { p.resources[sni] = &resource{required: true} } } return filter, true, nil } func isAggregateCluster(c *envoy_cluster_v3.Cluster) (*envoy_aggregate_cluster_v3.ClusterConfig, bool) { aggregateCluster := &envoy_aggregate_cluster_v3.ClusterConfig{} cdt, ok := c.ClusterDiscoveryType.(*envoy_cluster_v3.Cluster_ClusterType) if ok { cct := cdt.ClusterType.TypedConfig if cct != nil { err := anypb.UnmarshalTo(cct, aggregateCluster, proto.UnmarshalOptions{}) if err == nil { return aggregateCluster, true } } } return nil, false }