3804677570
Starting from and extending the mechanism introduced in #12110 we can specially handle the 3 main special Consul RPC endpoints that react to many config entries in a single blocking query in Connect: - `DiscoveryChain.Get` - `ConfigEntry.ResolveServiceConfig` - `Intentions.Match` All of these will internally watch for many config entries, and at least one of those will likely be not found in any given query. Because these are blends of multiple reads the exact solution from #12110 isn't perfectly aligned, but we can tweak the approach slightly and regain the utility of that mechanism. ### No Config Entries Found In this case, despite looking for many config entries none may be found at all. Unlike #12110 in this scenario we do not return an empty reply to the caller, but instead synthesize a struct from default values to return. This can be handled nearly identically to #12110 with the first 1-2 replies being non-empty payloads followed by the standard spurious wakeup suppression mechanism from #12110. ### No Change Since Last Wakeup Once a blocking query loop on the server has completed and slept at least once, there is a further optimization we can make here to detect if any of the config entries that were present at specific versions for the prior execution of the loop are identical for the loop we just woke up for. In that scenario we can return a slightly different internal sentinel error and basically externally handle it similar to #12110. This would mean that even if 20 discovery chain read RPC handling goroutines wakeup due to the creation of an unrelated config entry, the only ones that will terminate and reply with a blob of data are those that genuinely have new data to report. ### Extra Endpoints Since this pattern is pretty reusable, other key config-entry-adjacent endpoints used by `agent/proxycfg` also were updated: - `ConfigEntry.List` - `Internal.IntentionUpstreams` (tproxy)
782 lines
24 KiB
Go
782 lines
24 KiB
Go
package consul
|
|
|
|
import (
|
|
"fmt"
|
|
"reflect"
|
|
"time"
|
|
|
|
metrics "github.com/armon/go-metrics"
|
|
"github.com/armon/go-metrics/prometheus"
|
|
"github.com/hashicorp/go-hclog"
|
|
memdb "github.com/hashicorp/go-memdb"
|
|
"github.com/mitchellh/copystructure"
|
|
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
|
|
|
|
"github.com/hashicorp/consul/acl"
|
|
"github.com/hashicorp/consul/agent/configentry"
|
|
"github.com/hashicorp/consul/agent/consul/state"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
)
|
|
|
|
var ConfigSummaries = []prometheus.SummaryDefinition{
|
|
{
|
|
Name: []string{"config_entry", "apply"},
|
|
Help: "",
|
|
},
|
|
{
|
|
Name: []string{"config_entry", "get"},
|
|
Help: "",
|
|
},
|
|
{
|
|
Name: []string{"config_entry", "list"},
|
|
Help: "",
|
|
},
|
|
{
|
|
Name: []string{"config_entry", "listAll"},
|
|
Help: "",
|
|
},
|
|
{
|
|
Name: []string{"config_entry", "delete"},
|
|
Help: "",
|
|
},
|
|
{
|
|
Name: []string{"config_entry", "resolve_service_config"},
|
|
Help: "",
|
|
},
|
|
}
|
|
|
|
// The ConfigEntry endpoint is used to query centralized config information
|
|
type ConfigEntry struct {
|
|
srv *Server
|
|
logger hclog.Logger
|
|
}
|
|
|
|
// Apply does an upsert of the given config entry.
|
|
func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *bool) error {
|
|
if err := c.srv.validateEnterpriseRequest(args.Entry.GetEnterpriseMeta(), true); err != nil {
|
|
return err
|
|
}
|
|
|
|
err := gateWriteToSecondary(args.Datacenter, c.srv.config.Datacenter, c.srv.config.PrimaryDatacenter, args.Entry.GetKind())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Ensure that all config entry writes go to the primary datacenter. These will then
|
|
// be replicated to all the other datacenters.
|
|
args.Datacenter = c.srv.config.PrimaryDatacenter
|
|
|
|
if done, err := c.srv.ForwardRPC("ConfigEntry.Apply", args, reply); done {
|
|
return err
|
|
}
|
|
defer metrics.MeasureSince([]string{"config_entry", "apply"}, time.Now())
|
|
|
|
entMeta := args.Entry.GetEnterpriseMeta()
|
|
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, entMeta, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := c.preflightCheck(args.Entry.GetKind()); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Normalize and validate the incoming config entry as if it came from a user.
|
|
if err := args.Entry.Normalize(); err != nil {
|
|
return err
|
|
}
|
|
if err := args.Entry.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if !args.Entry.CanWrite(authz) {
|
|
return acl.ErrPermissionDenied
|
|
}
|
|
|
|
if args.Op != structs.ConfigEntryUpsert && args.Op != structs.ConfigEntryUpsertCAS {
|
|
args.Op = structs.ConfigEntryUpsert
|
|
}
|
|
|
|
if skip, err := c.shouldSkipOperation(args); err != nil {
|
|
return err
|
|
} else if skip {
|
|
*reply = true
|
|
return nil
|
|
}
|
|
|
|
resp, err := c.srv.raftApply(structs.ConfigEntryRequestType, args)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if respBool, ok := resp.(bool); ok {
|
|
*reply = respBool
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// shouldSkipOperation returns true if the result of the operation has
|
|
// already happened and is safe to skip.
|
|
//
|
|
// It is ok if this incorrectly detects something as changed when it
|
|
// in fact has not, the important thing is that it doesn't do
|
|
// the reverse and incorrectly detect a change as a no-op.
|
|
func (c *ConfigEntry) shouldSkipOperation(args *structs.ConfigEntryRequest) (bool, error) {
|
|
state := c.srv.fsm.State()
|
|
_, currentEntry, err := state.ConfigEntry(nil, args.Entry.GetKind(), args.Entry.GetName(), args.Entry.GetEnterpriseMeta())
|
|
if err != nil {
|
|
return false, fmt.Errorf("error reading current config entry value: %w", err)
|
|
}
|
|
|
|
switch args.Op {
|
|
case structs.ConfigEntryUpsert, structs.ConfigEntryUpsertCAS:
|
|
return c.shouldSkipUpsertOperation(currentEntry, args.Entry)
|
|
case structs.ConfigEntryDelete, structs.ConfigEntryDeleteCAS:
|
|
return (currentEntry == nil), nil
|
|
default:
|
|
return false, fmt.Errorf("invalid config entry operation type: %v", args.Op)
|
|
}
|
|
}
|
|
|
|
func (c *ConfigEntry) shouldSkipUpsertOperation(currentEntry, updatedEntry structs.ConfigEntry) (bool, error) {
|
|
if currentEntry == nil {
|
|
return false, nil
|
|
}
|
|
|
|
if currentEntry.GetKind() != updatedEntry.GetKind() ||
|
|
currentEntry.GetName() != updatedEntry.GetName() ||
|
|
!currentEntry.GetEnterpriseMeta().IsSame(updatedEntry.GetEnterpriseMeta()) {
|
|
return false, nil
|
|
}
|
|
|
|
// The only reason a fully Normalized and Validated config entry may
|
|
// legitimately differ from the persisted one is due to the embedded
|
|
// RaftIndex.
|
|
//
|
|
// So, to intercept more no-op upserts we temporarily set the new config
|
|
// entry's raft index field to that of the existing data for the purposes
|
|
// of comparison, and then restore it.
|
|
var (
|
|
currentRaftIndex = currentEntry.GetRaftIndex()
|
|
userProvidedRaftIndex = updatedEntry.GetRaftIndex()
|
|
|
|
currentRaftIndexCopy = *currentRaftIndex
|
|
userProvidedRaftIndexCopy = *userProvidedRaftIndex
|
|
)
|
|
|
|
*userProvidedRaftIndex = currentRaftIndexCopy // change
|
|
same := reflect.DeepEqual(currentEntry, updatedEntry) // compare
|
|
*userProvidedRaftIndex = userProvidedRaftIndexCopy // restore
|
|
|
|
return same, nil
|
|
}
|
|
|
|
// Get returns a single config entry by Kind/Name.
|
|
func (c *ConfigEntry) Get(args *structs.ConfigEntryQuery, reply *structs.ConfigEntryResponse) error {
|
|
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
|
|
return err
|
|
}
|
|
|
|
if done, err := c.srv.ForwardRPC("ConfigEntry.Get", args, reply); done {
|
|
return err
|
|
}
|
|
defer metrics.MeasureSince([]string{"config_entry", "get"}, time.Now())
|
|
|
|
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create a dummy config entry to check the ACL permissions.
|
|
lookupEntry, err := structs.MakeConfigEntry(args.Kind, args.Name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
lookupEntry.GetEnterpriseMeta().Merge(&args.EnterpriseMeta)
|
|
|
|
if !lookupEntry.CanRead(authz) {
|
|
return acl.ErrPermissionDenied
|
|
}
|
|
|
|
return c.srv.blockingQuery(
|
|
&args.QueryOptions,
|
|
&reply.QueryMeta,
|
|
func(ws memdb.WatchSet, state *state.Store) error {
|
|
index, entry, err := state.ConfigEntry(ws, args.Kind, args.Name, &args.EnterpriseMeta)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
reply.Index, reply.Entry = index, entry
|
|
if entry == nil {
|
|
return errNotFound
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// List returns all the config entries of the given kind. If Kind is blank,
|
|
// all existing config entries will be returned.
|
|
func (c *ConfigEntry) List(args *structs.ConfigEntryQuery, reply *structs.IndexedConfigEntries) error {
|
|
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
|
|
return err
|
|
}
|
|
|
|
if done, err := c.srv.ForwardRPC("ConfigEntry.List", args, reply); done {
|
|
return err
|
|
}
|
|
defer metrics.MeasureSince([]string{"config_entry", "list"}, time.Now())
|
|
|
|
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if args.Kind != "" {
|
|
if _, err := structs.MakeConfigEntry(args.Kind, ""); err != nil {
|
|
return fmt.Errorf("invalid config entry kind: %s", args.Kind)
|
|
}
|
|
}
|
|
|
|
var (
|
|
priorHash uint64
|
|
ranOnce bool
|
|
)
|
|
return c.srv.blockingQuery(
|
|
&args.QueryOptions,
|
|
&reply.QueryMeta,
|
|
func(ws memdb.WatchSet, state *state.Store) error {
|
|
index, entries, err := state.ConfigEntriesByKind(ws, args.Kind, &args.EnterpriseMeta)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Filter the entries returned by ACL permissions.
|
|
filteredEntries := make([]structs.ConfigEntry, 0, len(entries))
|
|
for _, entry := range entries {
|
|
if !entry.CanRead(authz) {
|
|
reply.QueryMeta.ResultsFilteredByACLs = true
|
|
continue
|
|
}
|
|
filteredEntries = append(filteredEntries, entry)
|
|
}
|
|
|
|
reply.Kind = args.Kind
|
|
reply.Index = index
|
|
reply.Entries = filteredEntries
|
|
|
|
// Generate a hash of the content driving this response. Use it to
|
|
// determine if the response is identical to a prior wakeup.
|
|
newHash, err := hashstructure_v2.Hash(filteredEntries, hashstructure_v2.FormatV2, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
|
|
}
|
|
|
|
if ranOnce && priorHash == newHash {
|
|
priorHash = newHash
|
|
return errNotChanged
|
|
} else {
|
|
priorHash = newHash
|
|
ranOnce = true
|
|
}
|
|
|
|
if len(reply.Entries) == 0 {
|
|
return errNotFound
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
var configEntryKindsFromConsul_1_8_0 = []string{
|
|
structs.ServiceDefaults,
|
|
structs.ProxyDefaults,
|
|
structs.ServiceRouter,
|
|
structs.ServiceSplitter,
|
|
structs.ServiceResolver,
|
|
structs.IngressGateway,
|
|
structs.TerminatingGateway,
|
|
}
|
|
|
|
// ListAll returns all the known configuration entries
|
|
func (c *ConfigEntry) ListAll(args *structs.ConfigEntryListAllRequest, reply *structs.IndexedGenericConfigEntries) error {
|
|
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
|
|
return err
|
|
}
|
|
|
|
if done, err := c.srv.ForwardRPC("ConfigEntry.ListAll", args, reply); done {
|
|
return err
|
|
}
|
|
defer metrics.MeasureSince([]string{"config_entry", "listAll"}, time.Now())
|
|
|
|
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(args.Kinds) == 0 {
|
|
args.Kinds = configEntryKindsFromConsul_1_8_0
|
|
}
|
|
|
|
kindMap := make(map[string]struct{})
|
|
for _, kind := range args.Kinds {
|
|
kindMap[kind] = struct{}{}
|
|
}
|
|
|
|
return c.srv.blockingQuery(
|
|
&args.QueryOptions,
|
|
&reply.QueryMeta,
|
|
func(ws memdb.WatchSet, state *state.Store) error {
|
|
index, entries, err := state.ConfigEntries(ws, &args.EnterpriseMeta)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Filter the entries returned by ACL permissions or by the provided kinds.
|
|
filteredEntries := make([]structs.ConfigEntry, 0, len(entries))
|
|
for _, entry := range entries {
|
|
if !entry.CanRead(authz) {
|
|
reply.QueryMeta.ResultsFilteredByACLs = true
|
|
continue
|
|
}
|
|
// Doing this filter outside of memdb isn't terribly
|
|
// performant. This kind filter is currently only used across
|
|
// version upgrades, so in the common case we are going to
|
|
// always return all of the data anyway, so it should be fine.
|
|
// If that changes at some point, then we should move this down
|
|
// into memdb.
|
|
if _, ok := kindMap[entry.GetKind()]; !ok {
|
|
continue
|
|
}
|
|
filteredEntries = append(filteredEntries, entry)
|
|
}
|
|
|
|
reply.Entries = filteredEntries
|
|
reply.Index = index
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// Delete deletes a config entry.
|
|
func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *structs.ConfigEntryDeleteResponse) error {
|
|
if err := c.srv.validateEnterpriseRequest(args.Entry.GetEnterpriseMeta(), true); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Ensure that all config entry writes go to the primary datacenter. These will then
|
|
// be replicated to all the other datacenters.
|
|
args.Datacenter = c.srv.config.PrimaryDatacenter
|
|
|
|
if done, err := c.srv.ForwardRPC("ConfigEntry.Delete", args, reply); done {
|
|
return err
|
|
}
|
|
defer metrics.MeasureSince([]string{"config_entry", "delete"}, time.Now())
|
|
|
|
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, args.Entry.GetEnterpriseMeta(), nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := c.preflightCheck(args.Entry.GetKind()); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Normalize the incoming entry.
|
|
if err := args.Entry.Normalize(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if !args.Entry.CanWrite(authz) {
|
|
return acl.ErrPermissionDenied
|
|
}
|
|
|
|
// Only delete and delete-cas ops are supported. If the caller erroneously
|
|
// sent something else, we assume they meant delete.
|
|
switch args.Op {
|
|
case structs.ConfigEntryDelete, structs.ConfigEntryDeleteCAS:
|
|
default:
|
|
args.Op = structs.ConfigEntryDelete
|
|
}
|
|
|
|
if skip, err := c.shouldSkipOperation(args); err != nil {
|
|
return err
|
|
} else if skip {
|
|
reply.Deleted = true
|
|
return nil
|
|
}
|
|
|
|
rsp, err := c.srv.raftApply(structs.ConfigEntryRequestType, args)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if args.Op == structs.ConfigEntryDeleteCAS {
|
|
// In CAS deletions the FSM will return a boolean value indicating whether the
|
|
// operation was successful.
|
|
deleted, _ := rsp.(bool)
|
|
reply.Deleted = deleted
|
|
} else {
|
|
// For non-CAS deletions any non-error result indicates a successful deletion.
|
|
reply.Deleted = true
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ResolveServiceConfig
|
|
func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, reply *structs.ServiceConfigResponse) error {
|
|
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
|
|
return err
|
|
}
|
|
|
|
if done, err := c.srv.ForwardRPC("ConfigEntry.ResolveServiceConfig", args, reply); done {
|
|
return err
|
|
}
|
|
defer metrics.MeasureSince([]string{"config_entry", "resolve_service_config"}, time.Now())
|
|
|
|
var authzContext acl.AuthorizerContext
|
|
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if authz.ServiceRead(args.Name, &authzContext) != acl.Allow {
|
|
return acl.ErrPermissionDenied
|
|
}
|
|
|
|
var (
|
|
priorHash uint64
|
|
ranOnce bool
|
|
)
|
|
return c.srv.blockingQuery(
|
|
&args.QueryOptions,
|
|
&reply.QueryMeta,
|
|
func(ws memdb.WatchSet, state *state.Store) error {
|
|
var (
|
|
upstreamIDs = args.UpstreamIDs
|
|
legacyUpstreams = false
|
|
)
|
|
|
|
// The request is considered legacy if the deprecated args.Upstream was used
|
|
if len(upstreamIDs) == 0 && len(args.Upstreams) > 0 {
|
|
legacyUpstreams = true
|
|
|
|
upstreamIDs = make([]structs.ServiceID, 0)
|
|
for _, upstream := range args.Upstreams {
|
|
// Before Consul namespaces were released, the Upstreams
|
|
// provided to the endpoint did not contain the namespace.
|
|
// Because of this we attach the enterprise meta of the
|
|
// request, which will just be the default namespace.
|
|
sid := structs.NewServiceID(upstream, &args.EnterpriseMeta)
|
|
upstreamIDs = append(upstreamIDs, sid)
|
|
}
|
|
}
|
|
|
|
// Fetch all relevant config entries.
|
|
|
|
index, entries, err := state.ReadResolvedServiceConfigEntries(
|
|
ws,
|
|
args.Name,
|
|
&args.EnterpriseMeta,
|
|
upstreamIDs,
|
|
args.Mode,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Generate a hash of the config entry content driving this
|
|
// response. Use it to determine if the response is identical to a
|
|
// prior wakeup.
|
|
newHash, err := hashstructure_v2.Hash(entries, hashstructure_v2.FormatV2, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
|
|
}
|
|
|
|
if ranOnce && priorHash == newHash {
|
|
priorHash = newHash
|
|
reply.Index = index
|
|
// NOTE: the prior response is still alive inside of *reply, which
|
|
// is desirable
|
|
return errNotChanged
|
|
} else {
|
|
priorHash = newHash
|
|
ranOnce = true
|
|
}
|
|
|
|
thisReply, err := c.computeResolvedServiceConfig(
|
|
args,
|
|
upstreamIDs,
|
|
legacyUpstreams,
|
|
entries,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
thisReply.Index = index
|
|
|
|
*reply = *thisReply
|
|
if entries.IsEmpty() {
|
|
// No config entries factored into this reply; it's a default.
|
|
return errNotFound
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (c *ConfigEntry) computeResolvedServiceConfig(
|
|
args *structs.ServiceConfigRequest,
|
|
upstreamIDs []structs.ServiceID,
|
|
legacyUpstreams bool,
|
|
entries *configentry.ResolvedServiceConfigSet,
|
|
) (*structs.ServiceConfigResponse, error) {
|
|
var thisReply structs.ServiceConfigResponse
|
|
|
|
thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault
|
|
|
|
// TODO(freddy) Refactor this into smaller set of state store functions
|
|
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
|
|
// blocking query, this function will be rerun and these state store lookups will both be current.
|
|
// We use the default enterprise meta to look up the global proxy defaults because they are not namespaced.
|
|
var proxyConfGlobalProtocol string
|
|
proxyConf := entries.GetProxyDefaults(args.PartitionOrDefault())
|
|
if proxyConf != nil {
|
|
// Apply the proxy defaults to the sidecar's proxy config
|
|
mapCopy, err := copystructure.Copy(proxyConf.Config)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to copy global proxy-defaults: %v", err)
|
|
}
|
|
thisReply.ProxyConfig = mapCopy.(map[string]interface{})
|
|
thisReply.Mode = proxyConf.Mode
|
|
thisReply.TransparentProxy = proxyConf.TransparentProxy
|
|
thisReply.MeshGateway = proxyConf.MeshGateway
|
|
thisReply.Expose = proxyConf.Expose
|
|
|
|
// Extract the global protocol from proxyConf for upstream configs.
|
|
rawProtocol := proxyConf.Config["protocol"]
|
|
if rawProtocol != nil {
|
|
var ok bool
|
|
proxyConfGlobalProtocol, ok = rawProtocol.(string)
|
|
if !ok {
|
|
return nil, fmt.Errorf("invalid protocol type %T", rawProtocol)
|
|
}
|
|
}
|
|
}
|
|
|
|
serviceConf := entries.GetServiceDefaults(
|
|
structs.NewServiceID(args.Name, &args.EnterpriseMeta),
|
|
)
|
|
if serviceConf != nil {
|
|
if serviceConf.Expose.Checks {
|
|
thisReply.Expose.Checks = true
|
|
}
|
|
if len(serviceConf.Expose.Paths) >= 1 {
|
|
thisReply.Expose.Paths = serviceConf.Expose.Paths
|
|
}
|
|
if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault {
|
|
thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
|
|
}
|
|
if serviceConf.Protocol != "" {
|
|
if thisReply.ProxyConfig == nil {
|
|
thisReply.ProxyConfig = make(map[string]interface{})
|
|
}
|
|
thisReply.ProxyConfig["protocol"] = serviceConf.Protocol
|
|
}
|
|
if serviceConf.TransparentProxy.OutboundListenerPort != 0 {
|
|
thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort
|
|
}
|
|
if serviceConf.TransparentProxy.DialedDirectly {
|
|
thisReply.TransparentProxy.DialedDirectly = serviceConf.TransparentProxy.DialedDirectly
|
|
}
|
|
if serviceConf.Mode != structs.ProxyModeDefault {
|
|
thisReply.Mode = serviceConf.Mode
|
|
}
|
|
}
|
|
|
|
// First collect all upstreams into a set of seen upstreams.
|
|
// Upstreams can come from:
|
|
// - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint
|
|
// - Implicitly from centralized upstream config in service-defaults
|
|
seenUpstreams := map[structs.ServiceID]struct{}{}
|
|
|
|
var (
|
|
noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0
|
|
|
|
// Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode
|
|
// will never be transparent because the service config request does not use the resolved value.
|
|
tproxy = args.Mode == structs.ProxyModeTransparent || thisReply.Mode == structs.ProxyModeTransparent
|
|
)
|
|
|
|
// The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration.
|
|
// If no upstreams were passed, then we should only return the resolved config if the proxy is in transparent mode.
|
|
// Otherwise we would return a resolved upstream config to a proxy with no configured upstreams.
|
|
if noUpstreamArgs && !tproxy {
|
|
return &thisReply, nil
|
|
}
|
|
|
|
// First store all upstreams that were provided in the request
|
|
for _, sid := range upstreamIDs {
|
|
if _, ok := seenUpstreams[sid]; !ok {
|
|
seenUpstreams[sid] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// Then store upstreams inferred from service-defaults and mapify the overrides.
|
|
var (
|
|
upstreamConfigs = make(map[structs.ServiceID]*structs.UpstreamConfig)
|
|
upstreamDefaults *structs.UpstreamConfig
|
|
// usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
|
|
usConfigs = make(map[structs.ServiceID]map[string]interface{})
|
|
)
|
|
if serviceConf != nil && serviceConf.UpstreamConfig != nil {
|
|
for i, override := range serviceConf.UpstreamConfig.Overrides {
|
|
if override.Name == "" {
|
|
c.logger.Warn(
|
|
"Skipping UpstreamConfig.Overrides entry without a required name field",
|
|
"entryIndex", i,
|
|
"kind", serviceConf.GetKind(),
|
|
"name", serviceConf.GetName(),
|
|
"namespace", serviceConf.GetEnterpriseMeta().NamespaceOrEmpty(),
|
|
)
|
|
continue // skip this impossible condition
|
|
}
|
|
seenUpstreams[override.ServiceID()] = struct{}{}
|
|
upstreamConfigs[override.ServiceID()] = override
|
|
}
|
|
if serviceConf.UpstreamConfig.Defaults != nil {
|
|
upstreamDefaults = serviceConf.UpstreamConfig.Defaults
|
|
|
|
// Store the upstream defaults under a wildcard key so that they can be applied to
|
|
// upstreams that are inferred from intentions and do not have explicit upstream configuration.
|
|
cfgMap := make(map[string]interface{})
|
|
upstreamDefaults.MergeInto(cfgMap)
|
|
|
|
wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace())
|
|
usConfigs[wildcard] = cfgMap
|
|
}
|
|
}
|
|
|
|
for upstream := range seenUpstreams {
|
|
resolvedCfg := make(map[string]interface{})
|
|
|
|
// The protocol of an upstream is resolved in this order:
|
|
// 1. Default protocol from proxy-defaults (how all services should be addressed)
|
|
// 2. Protocol for upstream service defined in its service-defaults (how the upstream wants to be addressed)
|
|
// 3. Protocol defined for the upstream in the service-defaults.(upstream_config.defaults|upstream_config.overrides) of the downstream
|
|
// (how the downstream wants to address it)
|
|
protocol := proxyConfGlobalProtocol
|
|
|
|
upstreamSvcDefaults := entries.GetServiceDefaults(
|
|
structs.NewServiceID(upstream.ID, &upstream.EnterpriseMeta),
|
|
)
|
|
if upstreamSvcDefaults != nil {
|
|
if upstreamSvcDefaults.Protocol != "" {
|
|
protocol = upstreamSvcDefaults.Protocol
|
|
}
|
|
}
|
|
|
|
if protocol != "" {
|
|
resolvedCfg["protocol"] = protocol
|
|
}
|
|
|
|
// Merge centralized defaults for all upstreams before configuration for specific upstreams
|
|
if upstreamDefaults != nil {
|
|
upstreamDefaults.MergeInto(resolvedCfg)
|
|
}
|
|
|
|
// The MeshGateway value from the proxy registration overrides the one from upstream_defaults
|
|
// because it is specific to the proxy instance.
|
|
//
|
|
// The goal is to flatten the mesh gateway mode in this order:
|
|
// 0. Value from centralized upstream_defaults
|
|
// 1. Value from local proxy registration
|
|
// 2. Value from centralized upstream_config
|
|
// 3. Value from local upstream definition. This last step is done in the client's service manager.
|
|
if !args.MeshGateway.IsZero() {
|
|
resolvedCfg["mesh_gateway"] = args.MeshGateway
|
|
}
|
|
|
|
if upstreamConfigs[upstream] != nil {
|
|
upstreamConfigs[upstream].MergeInto(resolvedCfg)
|
|
}
|
|
|
|
if len(resolvedCfg) > 0 {
|
|
usConfigs[upstream] = resolvedCfg
|
|
}
|
|
}
|
|
|
|
// don't allocate the slices just to not fill them
|
|
if len(usConfigs) == 0 {
|
|
return &thisReply, nil
|
|
}
|
|
|
|
if legacyUpstreams {
|
|
// For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces
|
|
thisReply.UpstreamConfigs = make(map[string]map[string]interface{})
|
|
|
|
for us, conf := range usConfigs {
|
|
thisReply.UpstreamConfigs[us.ID] = conf
|
|
}
|
|
|
|
} else {
|
|
thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs))
|
|
|
|
for us, conf := range usConfigs {
|
|
thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs,
|
|
structs.OpaqueUpstreamConfig{Upstream: us, Config: conf})
|
|
}
|
|
}
|
|
|
|
return &thisReply, nil
|
|
}
|
|
|
|
func gateWriteToSecondary(targetDC, localDC, primaryDC, kind string) error {
|
|
// ExportedServices entries are gated from interactions from secondary DCs
|
|
// because non-default partitions cannot be created in secondaries
|
|
// and services cannot be exported to another datacenter.
|
|
if kind != structs.ExportedServices {
|
|
return nil
|
|
}
|
|
if localDC == "" {
|
|
// This should not happen because the datacenter is defaulted in DefaultConfig.
|
|
return fmt.Errorf("unknown local datacenter")
|
|
}
|
|
|
|
if primaryDC == "" {
|
|
primaryDC = localDC
|
|
}
|
|
|
|
switch {
|
|
case targetDC == "" && localDC != primaryDC:
|
|
return fmt.Errorf("exported-services writes in secondary datacenters must target the primary datacenter explicitly.")
|
|
|
|
case targetDC != "" && targetDC != primaryDC:
|
|
return fmt.Errorf("exported-services writes must not target secondary datacenters.")
|
|
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// preflightCheck is meant to have kind-specific system validation outside of
|
|
// content validation. The initial use case is restricting the ability to do
|
|
// writes of service-intentions until the system is finished migration.
|
|
func (c *ConfigEntry) preflightCheck(kind string) error {
|
|
switch kind {
|
|
case structs.ServiceIntentions:
|
|
// Exit early if Connect hasn't been enabled.
|
|
if !c.srv.config.ConnectEnabled {
|
|
return ErrConnectNotEnabled
|
|
}
|
|
|
|
usingConfigEntries, err := c.srv.fsm.State().AreIntentionsInConfigEntries()
|
|
if err != nil {
|
|
return fmt.Errorf("system metadata lookup failed: %v", err)
|
|
}
|
|
if !usingConfigEntries {
|
|
return ErrIntentionsNotUpgradedYet
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|