merge feedback: fix typos; actually use deliverLatest added previously but not plumbed in

This commit is contained in:
Paul Banks 2018-10-09 17:57:26 +01:00
parent bcf2cc2de3
commit 0523efa2fe
17 changed files with 81 additions and 84 deletions

1
.gitignore vendored
View File

@ -22,3 +22,4 @@ website/.bundle
website/build/ website/build/
website/npm-debug.log website/npm-debug.log
website/vendor website/vendor
.netlify

View File

@ -207,7 +207,7 @@ func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request)
as.ProxyDestination = as.Proxy.DestinationServiceName as.ProxyDestination = as.Proxy.DestinationServiceName
} }
// Attach Connect configs if the exist. We use the actual proxy state since // Attach Connect configs if they exist. We use the actual proxy state since
// that may have had defaults filled in compared to the config that was // that may have had defaults filled in compared to the config that was
// provided with the service as stored in the NodeService here. // provided with the service as stored in the NodeService here.
if proxy, ok := proxies[id+"-proxy"]; ok { if proxy, ok := proxies[id+"-proxy"]; ok {
@ -861,7 +861,7 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re
Reason: fmt.Sprintf("Invalid SidecarService: %s", err)} Reason: fmt.Sprintf("Invalid SidecarService: %s", err)}
} }
if sidecar != nil { if sidecar != nil {
// Make sure we are allowed to register the side car using the token // Make sure we are allowed to register the sidecar using the token
// specified (might be specific to sidecar or the same one as the overall // specified (might be specific to sidecar or the same one as the overall
// request). // request).
if err := s.agent.vetServiceRegister(sidecarToken, sidecar); err != nil { if err := s.agent.vetServiceRegister(sidecarToken, sidecar); err != nil {

View File

@ -30,8 +30,8 @@ func (c *CatalogServices) Fetch(opts cache.FetchOptions, req cache.Request) (cac
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
reqReal.QueryOptions.MaxQueryTime = opts.Timeout reqReal.QueryOptions.MaxQueryTime = opts.Timeout
// Allways allow stale - there's no point in hitting leader if the request is // Always allow stale - there's no point in hitting leader if the request is
// going to be served from cache and endup arbitrarily stale anyway. This // going to be served from cache and end up arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all // allows cached service-discover to automatically read scale across all
// servers too. // servers too.
reqReal.AllowStale = true reqReal.AllowStale = true

View File

@ -22,7 +22,7 @@ type UpdateEvent struct {
// Notify registers a desire to be updated about changes to a cache result. // Notify registers a desire to be updated about changes to a cache result.
// //
// It is a helper that abstracts code from perfroming their own "blocking" query // It is a helper that abstracts code from performing their own "blocking" query
// logic against a cache key to watch for changes and to maintain the key in // logic against a cache key to watch for changes and to maintain the key in
// cache actively. It will continue to perform blocking Get requests until the // cache actively. It will continue to perform blocking Get requests until the
// context is canceled. // context is canceled.
@ -38,13 +38,13 @@ type UpdateEvent struct {
// drained, watching resumes correctly. If the pause is longer than the // drained, watching resumes correctly. If the pause is longer than the
// cachetype's TTL, the result might be removed from the local cache. Even in // cachetype's TTL, the result might be removed from the local cache. Even in
// this case though when the chan is drained again, the new Get will re-fetch // this case though when the chan is drained again, the new Get will re-fetch
// the entry from servers and resume notification behaviour transparently. // the entry from servers and resume notification behavior transparently.
// //
// The chan is passed in to allow multiple cached results to be watched by a // The chan is passed in to allow multiple cached results to be watched by a
// single consumer without juggling extra goroutines per watch. The // single consumer without juggling extra goroutines per watch. The
// correlationID is opaque and will be returned in all UpdateEvents generated by // correlationID is opaque and will be returned in all UpdateEvents generated by
// result of watching the specified request so the caller can set this to any // result of watching the specified request so the caller can set this to any
// value that allows them to dissambiguate between events in the returned chan // value that allows them to disambiguate between events in the returned chan
// when sharing a chan between multiple cache entries. If the chan is closed, // when sharing a chan between multiple cache entries. If the chan is closed,
// the notify loop will terminate. // the notify loop will terminate.
func (c *Cache) Notify(ctx context.Context, t string, r Request, func (c *Cache) Notify(ctx context.Context, t string, r Request,

View File

@ -1200,7 +1200,7 @@ func (b *Builder) serviceConnectVal(v *ServiceConnect) *structs.ServiceConnect {
if sidecar != nil { if sidecar != nil {
// Sanity checks // Sanity checks
if sidecar.ID != "" { if sidecar.ID != "" {
b.err = multierror.Append(b.err, fmt.Errorf("sidecar_service can't speficy an ID")) b.err = multierror.Append(b.err, fmt.Errorf("sidecar_service can't specify an ID"))
sidecar.ID = "" sidecar.ID = ""
} }
if sidecar.Connect != nil { if sidecar.Connect != nil {

View File

@ -665,7 +665,7 @@ type RuntimeConfig struct {
EncryptVerifyOutgoing bool EncryptVerifyOutgoing bool
// GRPCPort is the port the gRPC server listens on. Currently this only // GRPCPort is the port the gRPC server listens on. Currently this only
// exposes the xDS and ext_authx APIs for Envoy and it disabled by default. // exposes the xDS and ext_authz APIs for Envoy and it is disabled by default.
// //
// hcl: ports { grpc = int } // hcl: ports { grpc = int }
// flags: -grpc-port int // flags: -grpc-port int

View File

@ -1908,7 +1908,7 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
} }
} }
`}, `},
err: "sidecar_service can't speficy an ID", err: "sidecar_service can't specify an ID",
}, },
{ {
desc: "sidecar_service can't have nested sidecar", desc: "sidecar_service can't have nested sidecar",

View File

@ -4,7 +4,6 @@ import (
"errors" "errors"
"log" "log"
"sync" "sync"
"time"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/local"
@ -112,38 +111,7 @@ func (m *Manager) Run() error {
defer m.State.StopNotify(stateCh) defer m.State.StopNotify(stateCh)
for { for {
m.mu.Lock() m.syncState()
// Traverse the local state and ensure all proxy services are registered
services := m.State.Services()
for svcID, svc := range services {
if svc.Kind != structs.ServiceKindConnectProxy {
continue
}
// TODO(banks): need to work out when to default some stuff. For example
// Proxy.LocalServicePort is practically necessary for any sidecar and can
// default to the port of the sidecar service, but only if it's already
// registered and once we get past here, we don't have enough context to
// know that so we'd need to set it here if not during registration of the
// proxy service. Sidecar Service and managed proxies in the interim can
// do that, but we should validate more generally that that is always
// true.
err := m.ensureProxyServiceLocked(svc, m.State.ServiceToken(svcID))
if err != nil {
m.Logger.Printf("[ERR] failed to watch proxy service %s: %s", svc.ID,
err)
}
}
// Now see if any proxies were removed
for proxyID := range m.proxies {
if _, ok := services[proxyID]; !ok {
// Remove them
m.removeProxyServiceLocked(proxyID)
}
}
m.mu.Unlock()
// Wait for a state change // Wait for a state change
_, ok := <-stateCh _, ok := <-stateCh
@ -154,6 +122,42 @@ func (m *Manager) Run() error {
} }
} }
// syncState is called whenever the local state notifies a change. It holds the
// lock while finding any new or updated proxies and removing deleted ones.
func (m *Manager) syncState() {
m.mu.Lock()
defer m.mu.Unlock()
// Traverse the local state and ensure all proxy services are registered
services := m.State.Services()
for svcID, svc := range services {
if svc.Kind != structs.ServiceKindConnectProxy {
continue
}
// TODO(banks): need to work out when to default some stuff. For example
// Proxy.LocalServicePort is practically necessary for any sidecar and can
// default to the port of the sidecar service, but only if it's already
// registered and once we get past here, we don't have enough context to
// know that so we'd need to set it here if not during registration of the
// proxy service. Sidecar Service and managed proxies in the interim can
// do that, but we should validate more generally that that is always
// true.
err := m.ensureProxyServiceLocked(svc, m.State.ServiceToken(svcID))
if err != nil {
m.Logger.Printf("[ERR] failed to watch proxy service %s: %s", svc.ID,
err)
}
}
// Now see if any proxies were removed
for proxyID := range m.proxies {
if _, ok := services[proxyID]; !ok {
// Remove them
m.removeProxyServiceLocked(proxyID)
}
}
}
// ensureProxyServiceLocked adds or changes the proxy to our state. // ensureProxyServiceLocked adds or changes the proxy to our state.
func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, token string) error { func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, token string) error {
state, ok := m.proxies[ns.ID] state, ok := m.proxies[ns.ID]
@ -226,15 +230,7 @@ func (m *Manager) notify(snap *ConfigSnapshot) {
} }
for _, ch := range watchers { for _, ch := range watchers {
// Attempt delivery but don't let slow consumers block us forever. They m.deliverLatest(snap, ch)
// might miss updates but it's better than breaking everything.
//
// TODO(banks): should we close their chan here to force them to eventually
// notice they are too slow? Not sure if it really helps.
select {
case ch <- snap:
case <-time.After(100 * time.Millisecond):
}
} }
} }

View File

@ -14,7 +14,7 @@ import (
) )
// TestCacheTypes encapsulates all the different cache types proxycfg.State will // TestCacheTypes encapsulates all the different cache types proxycfg.State will
// watch/request for contolling one during testing. // watch/request for controlling one during testing.
type TestCacheTypes struct { type TestCacheTypes struct {
roots *ControllableCacheType roots *ControllableCacheType
leaf *ControllableCacheType leaf *ControllableCacheType
@ -68,7 +68,7 @@ func TestCacheWithTypes(t testing.T, types *TestCacheTypes) *cache.Cache {
return c return c
} }
// TestCerts genereates a CA and Leaf suitable for returning as mock CA // TestCerts generates a CA and Leaf suitable for returning as mock CA
// root/leaf cache requests. // root/leaf cache requests.
func TestCerts(t testing.T) (*structs.IndexedCARoots, *structs.IssuedCert) { func TestCerts(t testing.T) (*structs.IndexedCARoots, *structs.IssuedCert) {
t.Helper() t.Helper()
@ -82,8 +82,8 @@ func TestCerts(t testing.T) (*structs.IndexedCARoots, *structs.IssuedCert) {
return roots, TestLeafForCA(t, ca) return roots, TestLeafForCA(t, ca)
} }
// TestLeafForCA genereates new Leaf suitable for returning as mock CA // TestLeafForCA generates new Leaf suitable for returning as mock CA
// leaf cache resonse, signed by an existing CA. // leaf cache response, signed by an existing CA.
func TestLeafForCA(t testing.T, ca *structs.CARoot) *structs.IssuedCert { func TestLeafForCA(t testing.T, ca *structs.CARoot) *structs.IssuedCert {
leafPEM, pkPEM := connect.TestLeaf(t, "web", ca) leafPEM, pkPEM := connect.TestLeaf(t, "web", ca)
@ -171,14 +171,14 @@ func TestConfigSnapshot(t testing.T) *ConfigSnapshot {
} }
// ControllableCacheType is a cache.Type that simulates a typical blocking RPC // ControllableCacheType is a cache.Type that simulates a typical blocking RPC
// but lets us controll the responses and when they are deliverd easily. // but lets us control the responses and when they are delivered easily.
type ControllableCacheType struct { type ControllableCacheType struct {
index uint64 index uint64
value atomic.Value value atomic.Value
// Need a condvar to trigger all blocking requests (there might be multiple // Need a condvar to trigger all blocking requests (there might be multiple
// for same type due to background refresh and timing issues) when values // for same type due to background refresh and timing issues) when values
// change. Chans make it nondeterministic which one triggers or need extra // change. Chans make it nondeterministic which one triggers or need extra
// locking to coodrinate rplacing after close etc. // locking to coordinate replacing after close etc.
triggerMu sync.Mutex triggerMu sync.Mutex
trigger *sync.Cond trigger *sync.Cond
blocking bool blocking bool

View File

@ -50,7 +50,7 @@ func (a *Agent) sidecarServiceFromNodeService(ns *structs.NodeService, token str
} }
// Flag this as a sidecar - this is not persisted in catalog but only needed // Flag this as a sidecar - this is not persisted in catalog but only needed
// in local agent state to disambiguate lineage when deregistereing the parent // in local agent state to disambiguate lineage when deregistering the parent
// service later. // service later.
sidecar.LocallyRegisteredAsSidecar = true sidecar.LocallyRegisteredAsSidecar = true
@ -88,13 +88,13 @@ func (a *Agent) sidecarServiceFromNodeService(ns *structs.NodeService, token str
rangeLen := a.config.ConnectSidecarMaxPort - a.config.ConnectSidecarMinPort + 1 rangeLen := a.config.ConnectSidecarMaxPort - a.config.ConnectSidecarMinPort + 1
if sidecar.Port < 1 && a.config.ConnectSidecarMinPort > 0 && rangeLen > 0 { if sidecar.Port < 1 && a.config.ConnectSidecarMinPort > 0 && rangeLen > 0 {
// This did pick at random which was simpler but consul reload would assign // This did pick at random which was simpler but consul reload would assign
// new ports to all the sidecar since it unloads all state and re-populates. // new ports to all the sidecars since it unloads all state and
// It also made this more difficult to test (have to pin the range to one // re-populates. It also made this more difficult to test (have to pin the
// etc.). Instead we assign sequentially, but rather than N^2 lookups, just // range to one etc.). Instead we assign sequentially, but rather than N^2
// iterated services once and find the set of used ports in allocation // lookups, just iterated services once and find the set of used ports in
// range. We could maintain this state permenantly in agent but it doesn't // allocation range. We could maintain this state permanently in agent but
// seem to be necessary - even with thousands of services this is not // it doesn't seem to be necessary - even with thousands of services this is
// expensive to compute. // not expensive to compute.
usedPorts := make(map[int]struct{}) usedPorts := make(map[int]struct{})
for _, otherNS := range a.State.Services() { for _, otherNS := range a.State.Services() {
// Check if other port is in auto-assign range // Check if other port is in auto-assign range
@ -136,7 +136,7 @@ func (a *Agent) sidecarServiceFromNodeService(ns *structs.NodeService, token str
// just know they explicitly disabled auto assignment. // just know they explicitly disabled auto assignment.
if a.config.ConnectSidecarMinPort < 1 || a.config.ConnectSidecarMaxPort < 1 { if a.config.ConnectSidecarMinPort < 1 || a.config.ConnectSidecarMaxPort < 1 {
return nil, nil, "", fmt.Errorf("no port provided for sidecar_service " + return nil, nil, "", fmt.Errorf("no port provided for sidecar_service " +
"and auto-assignement disabled in config") "and auto-assignment disabled in config")
} }
return nil, nil, "", fmt.Errorf("no port provided for sidecar_service and none "+ return nil, nil, "", fmt.Errorf("no port provided for sidecar_service and none "+
"left in the configured range [%d, %d]", a.config.ConnectSidecarMinPort, "left in the configured range [%d, %d]", a.config.ConnectSidecarMinPort,

View File

@ -165,7 +165,7 @@ func TestAgent_sidecarServiceFromNodeService(t *testing.T) {
}, },
}, },
token: "foo", token: "foo",
wantErr: "auto-assignement disabled in config", wantErr: "auto-assignment disabled in config",
}, },
{ {
name: "invalid check type", name: "invalid check type",

View File

@ -33,7 +33,7 @@ type ServiceDefinition struct {
// Proxy is the configuration set for Kind = connect-proxy. It is mandatory in // Proxy is the configuration set for Kind = connect-proxy. It is mandatory in
// that case and an error to be set for any other kind. This config is part of // that case and an error to be set for any other kind. This config is part of
// a proxy service definition and is distinct from but shares some fields with // a proxy service definition and is distinct from but shares some fields with
// the Connect.Proxy which configures a manageged proxy as part of the actual // the Connect.Proxy which configures a managed proxy as part of the actual
// service's definition. This duplication is ugly but seemed better than the // service's definition. This duplication is ugly but seemed better than the
// alternative which was to re-use the same struct fields for both cases even // alternative which was to re-use the same struct fields for both cases even
// though the semantics are different and the non-shared fields make no sense // though the semantics are different and the non-shared fields make no sense

View File

@ -631,7 +631,7 @@ type NodeService struct {
// ProxyDestination is DEPRECATED in favor of Proxy.DestinationServiceName. // ProxyDestination is DEPRECATED in favor of Proxy.DestinationServiceName.
// It's retained since this struct is used to parse input for // It's retained since this struct is used to parse input for
// /catalog/register but nothing else internal should use it - once // /catalog/register but nothing else internal should use it - once
// request/config definitinos are passes all internal uses of NodeService // request/config definitions are passes all internal uses of NodeService
// should have this empty and use the Proxy.DestinationServiceNames field // should have this empty and use the Proxy.DestinationServiceNames field
// below. // below.
// //
@ -645,7 +645,7 @@ type NodeService struct {
// Proxy is the configuration set for Kind = connect-proxy. It is mandatory in // Proxy is the configuration set for Kind = connect-proxy. It is mandatory in
// that case and an error to be set for any other kind. This config is part of // that case and an error to be set for any other kind. This config is part of
// a proxy service definition and is distinct from but shares some fields with // a proxy service definition and is distinct from but shares some fields with
// the Connect.Proxy which configures a manageged proxy as part of the actual // the Connect.Proxy which configures a managed proxy as part of the actual
// service's definition. This duplication is ugly but seemed better than the // service's definition. This duplication is ugly but seemed better than the
// alternative which was to re-use the same struct fields for both cases even // alternative which was to re-use the same struct fields for both cases even
// though the semantics are different and the non-shred fields make no sense // though the semantics are different and the non-shred fields make no sense
@ -667,7 +667,7 @@ type NodeService struct {
// ID scheme as our sidecars do by default. We could use meta but that gets // ID scheme as our sidecars do by default. We could use meta but that gets
// unpleasant because we can't use the consul- prefix from an agent (reserved // unpleasant because we can't use the consul- prefix from an agent (reserved
// for use internally but in practice that means within the state store or in // for use internally but in practice that means within the state store or in
// responses only), and it leaks the detail publically which people might rely // responses only), and it leaks the detail publicly which people might rely
// on which is a bit unpleasant for something that is meant to be config-file // on which is a bit unpleasant for something that is meant to be config-file
// syntax sugar. Note this is not translated to ServiceNode and friends and // syntax sugar. Note this is not translated to ServiceNode and friends and
// may not be set on a NodeService that isn't the one the agent registered and // may not be set on a NodeService that isn't the one the agent registered and

View File

@ -8,23 +8,23 @@ import (
) )
func createResponse(typeURL string, version, nonce string, resources []proto.Message) (*envoy.DiscoveryResponse, error) { func createResponse(typeURL string, version, nonce string, resources []proto.Message) (*envoy.DiscoveryResponse, error) {
anys := make([]types.Any, len(resources)) anys := make([]types.Any, 0, len(resources))
for i, r := range resources { for _, r := range resources {
if r == nil { if r == nil {
continue continue
} }
if any, ok := r.(*types.Any); ok { if any, ok := r.(*types.Any); ok {
anys[i] = *any anys = append(anys, *any)
continue continue
} }
data, err := proto.Marshal(r) data, err := proto.Marshal(r)
if err != nil { if err != nil {
return nil, err return nil, err
} }
anys[i] = types.Any{ anys = append(anys, types.Any{
TypeUrl: typeURL, TypeUrl: typeURL,
Value: data, Value: data,
} })
} }
resp := &envoy.DiscoveryResponse{ resp := &envoy.DiscoveryResponse{
VersionInfo: version, VersionInfo: version,

View File

@ -216,8 +216,8 @@ func (c *cmd) lookupProxyIDForSidecar(client *api.Client) (string, error) {
} }
// LookupProxyIDForSidecar finds candidate local proxy registrations that are a // LookupProxyIDForSidecar finds candidate local proxy registrations that are a
// sidcar for the given service. It will return an ID if and only if there is // sidecar for the given service. It will return an ID if and only if there is
// exactly one registed connect proxy with `Proxy.DestinationServiceID` set to // exactly one registered connect proxy with `Proxy.DestinationServiceID` set to
// the specified service ID. // the specified service ID.
// //
// This is exported to share it with the connect envoy command. // This is exported to share it with the connect envoy command.
@ -237,10 +237,10 @@ func LookupProxyIDForSidecar(client *api.Client, sidecarFor string) (string, err
} }
if len(proxyIDs) == 0 { if len(proxyIDs) == 0 {
return "", fmt.Errorf("No sidecar proxy registereded for %s", sidecarFor) return "", fmt.Errorf("No sidecar proxy registered for %s", sidecarFor)
} }
if len(proxyIDs) > 1 { if len(proxyIDs) > 1 {
return "", fmt.Errorf("More than one sidecar proxy registereded for %s.\n"+ return "", fmt.Errorf("More than one sidecar proxy registered for %s.\n"+
" Start proxy with -proxy-id and one of the following IDs: %s", " Start proxy with -proxy-id and one of the following IDs: %s",
sidecarFor, strings.Join(proxyIDs, ", ")) sidecarFor, strings.Join(proxyIDs, ", "))
} }

View File

@ -84,7 +84,7 @@ func TestCommandConfigWatcher(t *testing.T) {
"-sidecar-for", "two-sidecars", "-sidecar-for", "two-sidecars",
}, },
// Order is non-deterministic so don't assert the list of proxy IDs here // Order is non-deterministic so don't assert the list of proxy IDs here
WantErr: `More than one sidecar proxy registereded for two-sidecars. WantErr: `More than one sidecar proxy registered for two-sidecars.
Start proxy with -proxy-id and one of the following IDs: `, Start proxy with -proxy-id and one of the following IDs: `,
}, },

View File

@ -118,7 +118,7 @@ func (uc *UpstreamConfig) String() string {
} }
// UpstreamResolverFuncFromClient returns a closure that captures a consul // UpstreamResolverFuncFromClient returns a closure that captures a consul
// client and when called provides aConsulResolver that can resolve the given // client and when called provides a ConsulResolver that can resolve the given
// UpstreamConfig using the provided api.Client dependency. // UpstreamConfig using the provided api.Client dependency.
func UpstreamResolverFuncFromClient(client *api.Client) func(cfg UpstreamConfig) (connect.Resolver, error) { func UpstreamResolverFuncFromClient(client *api.Client) func(cfg UpstreamConfig) (connect.Resolver, error) {
return func(cfg UpstreamConfig) (connect.Resolver, error) { return func(cfg UpstreamConfig) (connect.Resolver, error) {
@ -141,7 +141,7 @@ func UpstreamResolverFuncFromClient(client *api.Client) func(cfg UpstreamConfig)
} }
// ConfigWatcher is a simple interface to allow dynamic configurations from // ConfigWatcher is a simple interface to allow dynamic configurations from
// plugggable sources. // pluggable sources.
type ConfigWatcher interface { type ConfigWatcher interface {
// Watch returns a channel that will deliver new Configs if something external // Watch returns a channel that will deliver new Configs if something external
// provokes it. // provokes it.