open-consul/agent/proxycfg-sources/local/sync.go
R.B. Boyer b089f93292
proxycfg: ensure that an irrecoverable error in proxycfg closes the xds session and triggers a replacement proxycfg watcher (#16497)
Receiving an "acl not found" error from an RPC in the agent cache and the
streaming/event components will cause any request loops to cease under the
assumption that they will never work again if the token was destroyed. This
prevents log spam (#14144, #9738).

Unfortunately due to things like:

- authz requests going to stale servers that may not have witnessed the token
  creation yet

- authz requests in a secondary datacenter happening before the tokens get
  replicated to that datacenter

- authz requests from a primary TO a secondary datacenter happening before the
  tokens get replicated to that datacenter

The caller will get an "acl not found" *before* the token exists, rather than
just after. The machinery added above in the linked PRs will kick in and
prevent the request loop from looping around again once the tokens actually
exist.

For `consul-dataplane` usages, where xDS is served by the Consul servers
rather than the clients ultimately this is not a problem because in that
scenario the `agent/proxycfg` machinery is on-demand and launched by a new xDS
stream needing data for a specific service in the catalog. If the watching
goroutines are terminated it ripples down and terminates the xDS stream, which
CDP will eventually re-establish and restart everything.

For Consul client usages, the `agent/proxycfg` machinery is ahead-of-time
launched at service registration time (called "local" in some of the proxycfg
machinery) so when the xDS stream comes in the data is already ready to go. If
the watching goroutines terminate it should terminate the xDS stream, but
there's no mechanism to re-spawn the watching goroutines. If the xDS stream
reconnects it will see no `ConfigSnapshot` and will not get one again until
the client agent is restarted, or the service is re-registered with something
changed in it.

This PR fixes a few things in the machinery:

- there was an inadvertent deadlock in fetching snapshot from the proxycfg
  machinery by xDS, such that when the watching goroutine terminated the
  snapshots would never be fetched. This caused some of the xDS machinery to
  get indefinitely paused and not finish the teardown properly.

- Every 30s we now attempt to re-insert all locally registered services into
  the proxycfg machinery.

- When services are re-inserted into the proxycfg machinery we special case
  "dead" ones such that we unilaterally replace them rather that doing that
  conditionally.
2023-03-03 14:27:53 -06:00

140 lines
4.2 KiB
Go

package local
import (
"context"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
)
const resyncFrequency = 30 * time.Second
const source proxycfg.ProxySource = "local"
// SyncConfig contains the dependencies required by Sync.
type SyncConfig struct {
// Manager is the proxycfg Manager with which proxy services will be registered.
Manager ConfigManager
// State is the agent's local state that will be watched for proxy registrations.
State *local.State
// Tokens is used to retrieve a fallback ACL token if a service is registered
// without one.
Tokens *token.Store
// NodeName is the name of the local agent node.
NodeName string
// Logger will be used to write log messages.
Logger hclog.Logger
// ResyncFrequency is how often to do a resync and recreate any terminated
// watches.
ResyncFrequency time.Duration
}
// Sync watches the agent's local state and registers/deregisters services with
// the proxycfg Manager ahead-of-time so they're ready immediately when a proxy
// begins an xDS stream.
//
// It runs until the given context is canceled, so should be called it its own
// goroutine.
//
// Note: proxy service definitions from the agent's local state will always
// overwrite definitions of the same service from other sources (e.g. the
// catalog).
func Sync(ctx context.Context, cfg SyncConfig) {
// Single item buffer is enough since there is no data transferred so this is
// "level triggering" and we can't miss actual data.
stateCh := make(chan struct{}, 1)
cfg.State.Notify(stateCh)
defer cfg.State.StopNotify(stateCh)
var resyncCh <-chan time.Time
for {
sync(cfg)
if resyncCh == nil && cfg.ResyncFrequency > 0 {
resyncCh = time.After(cfg.ResyncFrequency)
}
select {
case <-stateCh:
// Wait for a state change.
case <-resyncCh:
resyncCh = nil
case <-ctx.Done():
return
}
}
}
func sync(cfg SyncConfig) {
cfg.Logger.Trace("syncing proxy services from local state")
services := cfg.State.AllServices()
// Traverse the local state and ensure all proxy services are registered
for sid, svc := range services {
if !svc.Kind.IsProxy() {
continue
}
// Retrieve the token used to register the service, or fallback to the
// default user token. This token is expected to match the token used in
// the xDS request for this data.
token := cfg.State.ServiceToken(sid)
if token == "" {
token = cfg.Tokens.UserToken()
}
id := proxycfg.ProxyID{
ServiceID: sid,
NodeName: cfg.NodeName,
// Note: we *intentionally* don't set Token here. All watches on local
// services use the same ACL token, regardless of whatever token is
// presented in the xDS stream (the token presented to the xDS server
// is checked before the watch is created).
Token: "",
}
// TODO(banks): need to work out when to default some stuff. For example
// Proxy.LocalServicePort is practically necessary for any sidecar and can
// default to the port of the sidecar service, but only if it's already
// registered and once we get past here, we don't have enough context to
// know that so we'd need to set it here if not during registration of the
// proxy service. Sidecar Service in the interim can do that, but we should
// validate more generally that that is always true.
err := cfg.Manager.Register(id, svc, source, token, true)
if err != nil {
cfg.Logger.Error("failed to watch proxy service",
"service", sid.String(),
"error", err,
)
}
}
// Now see if any proxies were removed
for _, proxyID := range cfg.Manager.RegisteredProxies(source) {
if _, ok := services[proxyID.ServiceID]; !ok {
cfg.Manager.Deregister(proxyID, source)
}
}
}
//go:generate mockery --name ConfigManager --inpackage
type ConfigManager interface {
Watch(id proxycfg.ProxyID) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc)
Register(proxyID proxycfg.ProxyID, service *structs.NodeService, source proxycfg.ProxySource, token string, overwrite bool) error
Deregister(proxyID proxycfg.ProxyID, source proxycfg.ProxySource)
RegisteredProxies(source proxycfg.ProxySource) []proxycfg.ProxyID
}