add a rate limiter to config auto-reload (#12490)

* add config watcher to the config package

* add logging to watcher

* add test and refactor to add WatcherEvent.

* add all API calls and fix a bug with recreated files

* add tests for watcher

* remove the unnecessary use of context

* Add debug log and a test for file rename

* use inode to detect if the file is recreated/replaced and only listen to create events.

* tidy ups (#1535)

* tidy ups

* Add tests for inode reconcile

* fix linux vs windows syscall

* fix linux vs windows syscall

* fix windows compile error

* increase timeout

* use ctime ID

* remove remove/creation test as it's a use case that fail in linux

* fix linux/windows to use Ino/CreationTime

* fix the watcher to only overwrite current file id

* fix linter error

* fix remove/create test

* set reconcile loop to 200 Milliseconds

* fix watcher to not trigger event on remove, add more tests

* on a remove event try to add the file back to the watcher and trigger the handler if success

* fix race condition

* fix flaky test

* fix race conditions

* set level to info

* fix when file is removed and get an event for it after

* fix to trigger handler when we get a remove but re-add fail

* fix error message

* add tests for directory watch and fixes

* detect if a file is a symlink and return an error on Add

* rename Watcher to FileWatcher and remove symlink deref

* add fsnotify@v1.5.1

* fix go mod

* do not reset timer on errors, rename OS specific files

* rename New func

* events trigger on write and rename

* add missing test

* fix flaking tests

* fix flaky test

* check reconcile when removed

* delete invalid file

* fix test to create files with different mod time.

* back date file instead of sleeping

* add watching file in agent command.

* fix watcher call to use new API

* add configuration and stop watcher when server stop

* add certs as watched files

* move FileWatcher to the agent start instead of the command code

* stop watcher before replacing it

* save watched files in agent

* add add and remove interfaces to the file watcher

* fix remove to not return an error

* use `Add` and `Remove` to update certs files

* fix tests

* close events channel on the file watcher even when the context is done

* extract `NotAutoReloadableRuntimeConfig` is a separate struct

* fix linter errors

* add Ca configs and outgoing verify to the not auto reloadable config

* add some logs and fix to use background context

* add tests to auto-config reload

* remove stale test

* add tests to changes to config files

* add check to see if old cert files still trigger updates

* rename `NotAutoReloadableRuntimeConfig` to `StaticRuntimeConfig`

* fix to re add both key and cert file. Add test to cover this case.

* review suggestion

Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com>

* add check to static runtime config changes

* fix test

* add changelog file

* fix review comments

* Apply suggestions from code review

Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com>

* update flag description

Co-authored-by: FFMMM <FFMMM@users.noreply.github.com>

* fix compilation error

* add static runtime config support

* fix test

* fix review comments

* fix log test

* Update .changelog/12329.txt

Co-authored-by: Dan Upton <daniel@floppy.co>

* transfer tests to runtime_test.go

* fix filewatcher Replace to not deadlock.

* avoid having lingering locks

Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com>

* split ReloadConfig func

* fix warning message

Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com>

* convert `FileWatcher` into an interface

* fix compilation errors

* fix tests

* extract func for adding and removing files

* add a coalesceTimer with a very small timer

* extract coaelsce Timer and add a shim for testing

* add tests to coalesceTimer fix to send remaining events

* set `coalesceTimer` to 1 Second

* support symlink, fix a nil deref.

* fix compile error

* fix compile error

* refactor file watcher rate limiting to be a Watcher implementation

* fix linter issue

* fix runtime config

* fix runtime test

* fix flaky tests

* fix compile error

* Apply suggestions from code review

Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com>

* fix agent New to return an error if File watcher New return an error

* quit timer loop if ctx is canceled

* Apply suggestions from code review

Co-authored-by: Chris S. Kim <ckim@hashicorp.com>

Co-authored-by: Ashwin Venkatesh <ashwin@hashicorp.com>
Co-authored-by: R.B. Boyer <4903+rboyer@users.noreply.github.com>
Co-authored-by: FFMMM <FFMMM@users.noreply.github.com>
Co-authored-by: Daniel Upton <daniel@floppy.co>
Co-authored-by: Chris S. Kim <ckim@hashicorp.com>
This commit is contained in:
Dhia Ayachi 2022-04-04 11:31:39 -04:00 committed by GitHub
parent d099eca725
commit cdcb249449
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 437 additions and 103 deletions

View File

@ -361,9 +361,9 @@ type Agent struct {
// run by the Agent
routineManager *routine.Manager
// FileWatcher is the watcher responsible to report events when a config file
// configFileWatcher is the watcher responsible to report events when a config file
// changed
FileWatcher config.Watcher
configFileWatcher config.Watcher
// xdsServer serves the XDS protocol for configuring Envoy proxies.
xdsServer *xds.Server
@ -462,6 +462,13 @@ func New(bd BaseDeps) (*Agent, error) {
a.baseDeps.WatchedFiles = append(a.baseDeps.WatchedFiles, f.Cfg.CertFile)
}
}
if a.baseDeps.RuntimeConfig.AutoReloadConfig && len(a.baseDeps.WatchedFiles) > 0 {
w, err := config.NewRateLimitedFileWatcher(a.baseDeps.WatchedFiles, a.baseDeps.Logger, a.baseDeps.RuntimeConfig.AutoReloadConfigCoalesceInterval)
if err != nil {
return nil, err
}
a.configFileWatcher = w
}
return &a, nil
}
@ -713,25 +720,20 @@ func (a *Agent) Start(ctx context.Context) error {
})
// start a go routine to reload config based on file watcher events
if a.baseDeps.RuntimeConfig.AutoReloadConfig && len(a.baseDeps.WatchedFiles) > 0 {
w, err := config.NewFileWatcher(a.baseDeps.WatchedFiles, a.baseDeps.Logger)
if err != nil {
a.baseDeps.Logger.Error("error loading config", "error", err)
} else {
a.FileWatcher = w
a.baseDeps.Logger.Debug("starting file watcher")
a.FileWatcher.Start(context.Background())
go func() {
for event := range a.FileWatcher.EventsCh() {
a.baseDeps.Logger.Debug("auto-reload config triggered", "event-file", event.Filename)
err := a.AutoReloadConfig()
if err != nil {
a.baseDeps.Logger.Error("error loading config", "error", err)
}
if a.configFileWatcher != nil {
a.baseDeps.Logger.Debug("starting file watcher")
a.configFileWatcher.Start(context.Background())
go func() {
for event := range a.configFileWatcher.EventsCh() {
a.baseDeps.Logger.Debug("auto-reload config triggered", "num-events", len(event.Filenames))
err := a.AutoReloadConfig()
if err != nil {
a.baseDeps.Logger.Error("error loading config", "error", err)
}
}()
}
}
}()
}
return nil
}
@ -1413,8 +1415,8 @@ func (a *Agent) ShutdownAgent() error {
a.stopAllWatches()
// Stop config file watcher
if a.FileWatcher != nil {
a.FileWatcher.Stop()
if a.configFileWatcher != nil {
a.configFileWatcher.Stop()
}
a.stopLicenseManager()
@ -3772,13 +3774,13 @@ func (a *Agent) reloadConfig(autoReload bool) error {
{a.config.TLS.HTTPS, newCfg.TLS.HTTPS},
} {
if f.oldCfg.KeyFile != f.newCfg.KeyFile {
a.FileWatcher.Replace(f.oldCfg.KeyFile, f.newCfg.KeyFile)
a.configFileWatcher.Replace(f.oldCfg.KeyFile, f.newCfg.KeyFile)
if err != nil {
return err
}
}
if f.oldCfg.CertFile != f.newCfg.CertFile {
a.FileWatcher.Replace(f.oldCfg.CertFile, f.newCfg.CertFile)
a.configFileWatcher.Replace(f.oldCfg.CertFile, f.newCfg.CertFile)
if err != nil {
return err
}

View File

@ -5545,7 +5545,8 @@ func TestAgent_AutoReloadDoReload_WhenCertThenKeyUpdated(t *testing.T) {
testrpc.WaitForTestAgent(t, srv.RPC, "dc1", testrpc.WithToken(TestDefaultInitialManagementToken))
cert1 := srv.tlsConfigurator.Cert()
cert1Pub := srv.tlsConfigurator.Cert().Certificate
cert1Key := srv.tlsConfigurator.Cert().PrivateKey
certNew, privateKeyNew, err := tlsutil.GenerateCert(tlsutil.CertOpts{
Signer: signer,
@ -5575,8 +5576,10 @@ func TestAgent_AutoReloadDoReload_WhenCertThenKeyUpdated(t *testing.T) {
// cert should not change as we did not update the associated key
time.Sleep(1 * time.Second)
retry.Run(t, func(r *retry.R) {
require.Equal(r, cert1.Certificate, srv.tlsConfigurator.Cert().Certificate)
require.Equal(r, cert1.PrivateKey, srv.tlsConfigurator.Cert().PrivateKey)
cert := srv.tlsConfigurator.Cert()
require.NotNil(r, cert)
require.Equal(r, cert1Pub, cert.Certificate)
require.Equal(r, cert1Key, cert.PrivateKey)
})
require.NoError(t, ioutil.WriteFile(keyFile, []byte(privateKeyNew), 0600))
@ -5584,8 +5587,8 @@ func TestAgent_AutoReloadDoReload_WhenCertThenKeyUpdated(t *testing.T) {
// cert should change as we did not update the associated key
time.Sleep(1 * time.Second)
retry.Run(t, func(r *retry.R) {
require.NotEqual(r, cert1.Certificate, srv.tlsConfigurator.Cert().Certificate)
require.NotEqual(r, cert1.PrivateKey, srv.tlsConfigurator.Cert().PrivateKey)
require.NotEqual(r, cert1Pub, srv.tlsConfigurator.Cert().Certificate)
require.NotEqual(r, cert1Key, srv.tlsConfigurator.Cert().PrivateKey)
})
}
@ -5647,11 +5650,13 @@ func TestAgent_AutoReloadDoReload_WhenKeyThenCertUpdated(t *testing.T) {
`), 0600))
srv := StartTestAgent(t, TestAgent{Name: "TestAgent-Server", HCL: hclConfig, configFiles: []string{configFile}})
defer srv.Shutdown()
testrpc.WaitForTestAgent(t, srv.RPC, "dc1", testrpc.WithToken(TestDefaultInitialManagementToken))
cert1 := srv.tlsConfigurator.Cert()
cert1Pub := srv.tlsConfigurator.Cert().Certificate
cert1Key := srv.tlsConfigurator.Cert().PrivateKey
certNew, privateKeyNew, err := tlsutil.GenerateCert(tlsutil.CertOpts{
Signer: signer,
@ -5667,8 +5672,10 @@ func TestAgent_AutoReloadDoReload_WhenKeyThenCertUpdated(t *testing.T) {
// cert should not change as we did not update the associated key
time.Sleep(1 * time.Second)
retry.Run(t, func(r *retry.R) {
require.Equal(r, cert1.Certificate, srv.tlsConfigurator.Cert().Certificate)
require.Equal(r, cert1.PrivateKey, srv.tlsConfigurator.Cert().PrivateKey)
cert := srv.tlsConfigurator.Cert()
require.NotNil(r, cert)
require.Equal(r, cert1Pub, cert.Certificate)
require.Equal(r, cert1Key, cert.PrivateKey)
})
require.NoError(t, ioutil.WriteFile(certFileNew, []byte(certNew), 0600))
@ -5689,10 +5696,13 @@ func TestAgent_AutoReloadDoReload_WhenKeyThenCertUpdated(t *testing.T) {
// cert should change as we did not update the associated key
time.Sleep(1 * time.Second)
retry.Run(t, func(r *retry.R) {
require.NotEqual(r, cert1.Certificate, srv.tlsConfigurator.Cert().Certificate)
require.NotEqual(r, cert1.PrivateKey, srv.tlsConfigurator.Cert().PrivateKey)
cert := srv.tlsConfigurator.Cert()
require.NotNil(r, cert)
require.NotEqual(r, cert1Key, cert.Certificate)
require.NotEqual(r, cert1Key, cert.PrivateKey)
})
cert2 := srv.tlsConfigurator.Cert()
cert2Pub := srv.tlsConfigurator.Cert().Certificate
cert2Key := srv.tlsConfigurator.Cert().PrivateKey
certNew2, privateKeyNew2, err := tlsutil.GenerateCert(tlsutil.CertOpts{
Signer: signer,
@ -5707,8 +5717,10 @@ func TestAgent_AutoReloadDoReload_WhenKeyThenCertUpdated(t *testing.T) {
// cert should not change as we did not update the associated cert
time.Sleep(1 * time.Second)
retry.Run(t, func(r *retry.R) {
require.Equal(r, cert2.Certificate, srv.tlsConfigurator.Cert().Certificate)
require.Equal(r, cert2.PrivateKey, srv.tlsConfigurator.Cert().PrivateKey)
cert := srv.tlsConfigurator.Cert()
require.NotNil(r, cert)
require.Equal(r, cert2Pub, cert.Certificate)
require.Equal(r, cert2Key, cert.PrivateKey)
})
require.NoError(t, ioutil.WriteFile(certFileNew, []byte(certNew2), 0600))
@ -5716,7 +5728,120 @@ func TestAgent_AutoReloadDoReload_WhenKeyThenCertUpdated(t *testing.T) {
// cert should change as we did update the associated key
time.Sleep(1 * time.Second)
retry.Run(t, func(r *retry.R) {
require.NotEqual(r, cert2.Certificate, srv.tlsConfigurator.Cert().Certificate)
require.NotEqual(r, cert2.PrivateKey, srv.tlsConfigurator.Cert().PrivateKey)
cert := srv.tlsConfigurator.Cert()
require.NotNil(r, cert)
require.NotEqual(r, cert2Pub, cert.Certificate)
require.NotEqual(r, cert2Key, cert.PrivateKey)
})
}
func Test_coalesceTimerTwoPeriods(t *testing.T) {
certsDir := testutil.TempDir(t, "auto-config")
// write some test TLS certificates out to the cfg dir
serverName := "server.dc1.consul"
signer, _, err := tlsutil.GeneratePrivateKey()
require.NoError(t, err)
ca, _, err := tlsutil.GenerateCA(tlsutil.CAOpts{Signer: signer})
require.NoError(t, err)
cert, privateKey, err := tlsutil.GenerateCert(tlsutil.CertOpts{
Signer: signer,
CA: ca,
Name: "Test Cert Name",
Days: 365,
DNSNames: []string{serverName},
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
})
require.NoError(t, err)
certFile := filepath.Join(certsDir, "cert.pem")
caFile := filepath.Join(certsDir, "cacert.pem")
keyFile := filepath.Join(certsDir, "key.pem")
require.NoError(t, ioutil.WriteFile(certFile, []byte(cert), 0600))
require.NoError(t, ioutil.WriteFile(caFile, []byte(ca), 0600))
require.NoError(t, ioutil.WriteFile(keyFile, []byte(privateKey), 0600))
// generate a gossip key
gossipKey := make([]byte, 32)
n, err := rand.Read(gossipKey)
require.NoError(t, err)
require.Equal(t, 32, n)
gossipKeyEncoded := base64.StdEncoding.EncodeToString(gossipKey)
hclConfig := TestACLConfigWithParams(nil)
configFile := testutil.TempDir(t, "config") + "/config.hcl"
require.NoError(t, ioutil.WriteFile(configFile, []byte(`
encrypt = "`+gossipKeyEncoded+`"
encrypt_verify_incoming = true
encrypt_verify_outgoing = true
verify_incoming = true
verify_outgoing = true
verify_server_hostname = true
ca_file = "`+caFile+`"
cert_file = "`+certFile+`"
key_file = "`+keyFile+`"
connect { enabled = true }
auto_reload_config = true
`), 0600))
coalesceInterval := 100 * time.Millisecond
testAgent := TestAgent{Name: "TestAgent-Server", HCL: hclConfig, configFiles: []string{configFile}, Config: &config.RuntimeConfig{
AutoReloadConfigCoalesceInterval: coalesceInterval,
}}
srv := StartTestAgent(t, testAgent)
defer srv.Shutdown()
testrpc.WaitForTestAgent(t, srv.RPC, "dc1", testrpc.WithToken(TestDefaultInitialManagementToken))
cert1Pub := srv.tlsConfigurator.Cert().Certificate
cert1Key := srv.tlsConfigurator.Cert().PrivateKey
certNew, privateKeyNew, err := tlsutil.GenerateCert(tlsutil.CertOpts{
Signer: signer,
CA: ca,
Name: "Test Cert Name",
Days: 365,
DNSNames: []string{serverName},
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth},
})
require.NoError(t, err)
certFileNew := filepath.Join(certsDir, "cert_new.pem")
require.NoError(t, ioutil.WriteFile(certFileNew, []byte(certNew), 0600))
require.NoError(t, ioutil.WriteFile(configFile, []byte(`
encrypt = "`+gossipKeyEncoded+`"
encrypt_verify_incoming = true
encrypt_verify_outgoing = true
verify_incoming = true
verify_outgoing = true
verify_server_hostname = true
ca_file = "`+caFile+`"
cert_file = "`+certFileNew+`"
key_file = "`+keyFile+`"
connect { enabled = true }
auto_reload_config = true
`), 0600))
// cert should not change as we did not update the associated key
time.Sleep(coalesceInterval * 2)
retry.Run(t, func(r *retry.R) {
cert := srv.tlsConfigurator.Cert()
require.NotNil(r, cert)
require.Equal(r, cert1Pub, cert.Certificate)
require.Equal(r, cert1Key, cert.PrivateKey)
})
require.NoError(t, ioutil.WriteFile(keyFile, []byte(privateKeyNew), 0600))
// cert should change as we did not update the associated key
time.Sleep(coalesceInterval * 2)
retry.Run(t, func(r *retry.R) {
require.NotEqual(r, cert1Pub, srv.tlsConfigurator.Cert().Certificate)
require.NotEqual(r, cert1Key, srv.tlsConfigurator.Cert().PrivateKey)
})
}

View File

@ -1004,64 +1004,65 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
LogRotateBytes: intVal(c.LogRotateBytes),
LogRotateMaxFiles: intVal(c.LogRotateMaxFiles),
},
MaxQueryTime: b.durationVal("max_query_time", c.MaxQueryTime),
NodeID: types.NodeID(stringVal(c.NodeID)),
NodeMeta: c.NodeMeta,
NodeName: b.nodeName(c.NodeName),
ReadReplica: boolVal(c.ReadReplica),
PidFile: stringVal(c.PidFile),
PrimaryDatacenter: primaryDatacenter,
PrimaryGateways: b.expandAllOptionalAddrs("primary_gateways", c.PrimaryGateways),
PrimaryGatewaysInterval: b.durationVal("primary_gateways_interval", c.PrimaryGatewaysInterval),
RPCAdvertiseAddr: rpcAdvertiseAddr,
RPCBindAddr: rpcBindAddr,
RPCHandshakeTimeout: b.durationVal("limits.rpc_handshake_timeout", c.Limits.RPCHandshakeTimeout),
RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout),
RPCMaxBurst: intVal(c.Limits.RPCMaxBurst),
RPCMaxConnsPerClient: intVal(c.Limits.RPCMaxConnsPerClient),
RPCProtocol: intVal(c.RPCProtocol),
RPCRateLimit: rate.Limit(float64Val(c.Limits.RPCRate)),
RPCConfig: consul.RPCConfig{EnableStreaming: boolValWithDefault(c.RPC.EnableStreaming, serverMode)},
RaftProtocol: intVal(c.RaftProtocol),
RaftSnapshotThreshold: intVal(c.RaftSnapshotThreshold),
RaftSnapshotInterval: b.durationVal("raft_snapshot_interval", c.RaftSnapshotInterval),
RaftTrailingLogs: intVal(c.RaftTrailingLogs),
ReconnectTimeoutLAN: b.durationVal("reconnect_timeout", c.ReconnectTimeoutLAN),
ReconnectTimeoutWAN: b.durationVal("reconnect_timeout_wan", c.ReconnectTimeoutWAN),
RejoinAfterLeave: boolVal(c.RejoinAfterLeave),
RetryJoinIntervalLAN: b.durationVal("retry_interval", c.RetryJoinIntervalLAN),
RetryJoinIntervalWAN: b.durationVal("retry_interval_wan", c.RetryJoinIntervalWAN),
RetryJoinLAN: b.expandAllOptionalAddrs("retry_join", c.RetryJoinLAN),
RetryJoinMaxAttemptsLAN: intVal(c.RetryJoinMaxAttemptsLAN),
RetryJoinMaxAttemptsWAN: intVal(c.RetryJoinMaxAttemptsWAN),
RetryJoinWAN: b.expandAllOptionalAddrs("retry_join_wan", c.RetryJoinWAN),
SegmentName: stringVal(c.SegmentName),
Segments: segments,
SegmentLimit: intVal(c.SegmentLimit),
SerfAdvertiseAddrLAN: serfAdvertiseAddrLAN,
SerfAdvertiseAddrWAN: serfAdvertiseAddrWAN,
SerfAllowedCIDRsLAN: serfAllowedCIDRSLAN,
SerfAllowedCIDRsWAN: serfAllowedCIDRSWAN,
SerfBindAddrLAN: serfBindAddrLAN,
SerfBindAddrWAN: serfBindAddrWAN,
SerfPortLAN: serfPortLAN,
SerfPortWAN: serfPortWAN,
ServerMode: serverMode,
ServerName: stringVal(c.ServerName),
ServerPort: serverPort,
Services: services,
SessionTTLMin: b.durationVal("session_ttl_min", c.SessionTTLMin),
SkipLeaveOnInt: skipLeaveOnInt,
StartJoinAddrsLAN: b.expandAllOptionalAddrs("start_join", c.StartJoinAddrsLAN),
StartJoinAddrsWAN: b.expandAllOptionalAddrs("start_join_wan", c.StartJoinAddrsWAN),
TaggedAddresses: c.TaggedAddresses,
TranslateWANAddrs: boolVal(c.TranslateWANAddrs),
TxnMaxReqLen: uint64Val(c.Limits.TxnMaxReqLen),
UIConfig: b.uiConfigVal(c.UIConfig),
UnixSocketGroup: stringVal(c.UnixSocket.Group),
UnixSocketMode: stringVal(c.UnixSocket.Mode),
UnixSocketUser: stringVal(c.UnixSocket.User),
Watches: c.Watches,
MaxQueryTime: b.durationVal("max_query_time", c.MaxQueryTime),
NodeID: types.NodeID(stringVal(c.NodeID)),
NodeMeta: c.NodeMeta,
NodeName: b.nodeName(c.NodeName),
ReadReplica: boolVal(c.ReadReplica),
PidFile: stringVal(c.PidFile),
PrimaryDatacenter: primaryDatacenter,
PrimaryGateways: b.expandAllOptionalAddrs("primary_gateways", c.PrimaryGateways),
PrimaryGatewaysInterval: b.durationVal("primary_gateways_interval", c.PrimaryGatewaysInterval),
RPCAdvertiseAddr: rpcAdvertiseAddr,
RPCBindAddr: rpcBindAddr,
RPCHandshakeTimeout: b.durationVal("limits.rpc_handshake_timeout", c.Limits.RPCHandshakeTimeout),
RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout),
RPCMaxBurst: intVal(c.Limits.RPCMaxBurst),
RPCMaxConnsPerClient: intVal(c.Limits.RPCMaxConnsPerClient),
RPCProtocol: intVal(c.RPCProtocol),
RPCRateLimit: rate.Limit(float64Val(c.Limits.RPCRate)),
RPCConfig: consul.RPCConfig{EnableStreaming: boolValWithDefault(c.RPC.EnableStreaming, serverMode)},
RaftProtocol: intVal(c.RaftProtocol),
RaftSnapshotThreshold: intVal(c.RaftSnapshotThreshold),
RaftSnapshotInterval: b.durationVal("raft_snapshot_interval", c.RaftSnapshotInterval),
RaftTrailingLogs: intVal(c.RaftTrailingLogs),
ReconnectTimeoutLAN: b.durationVal("reconnect_timeout", c.ReconnectTimeoutLAN),
ReconnectTimeoutWAN: b.durationVal("reconnect_timeout_wan", c.ReconnectTimeoutWAN),
RejoinAfterLeave: boolVal(c.RejoinAfterLeave),
RetryJoinIntervalLAN: b.durationVal("retry_interval", c.RetryJoinIntervalLAN),
RetryJoinIntervalWAN: b.durationVal("retry_interval_wan", c.RetryJoinIntervalWAN),
RetryJoinLAN: b.expandAllOptionalAddrs("retry_join", c.RetryJoinLAN),
RetryJoinMaxAttemptsLAN: intVal(c.RetryJoinMaxAttemptsLAN),
RetryJoinMaxAttemptsWAN: intVal(c.RetryJoinMaxAttemptsWAN),
RetryJoinWAN: b.expandAllOptionalAddrs("retry_join_wan", c.RetryJoinWAN),
SegmentName: stringVal(c.SegmentName),
Segments: segments,
SegmentLimit: intVal(c.SegmentLimit),
SerfAdvertiseAddrLAN: serfAdvertiseAddrLAN,
SerfAdvertiseAddrWAN: serfAdvertiseAddrWAN,
SerfAllowedCIDRsLAN: serfAllowedCIDRSLAN,
SerfAllowedCIDRsWAN: serfAllowedCIDRSWAN,
SerfBindAddrLAN: serfBindAddrLAN,
SerfBindAddrWAN: serfBindAddrWAN,
SerfPortLAN: serfPortLAN,
SerfPortWAN: serfPortWAN,
ServerMode: serverMode,
ServerName: stringVal(c.ServerName),
ServerPort: serverPort,
Services: services,
SessionTTLMin: b.durationVal("session_ttl_min", c.SessionTTLMin),
SkipLeaveOnInt: skipLeaveOnInt,
StartJoinAddrsLAN: b.expandAllOptionalAddrs("start_join", c.StartJoinAddrsLAN),
StartJoinAddrsWAN: b.expandAllOptionalAddrs("start_join_wan", c.StartJoinAddrsWAN),
TaggedAddresses: c.TaggedAddresses,
TranslateWANAddrs: boolVal(c.TranslateWANAddrs),
TxnMaxReqLen: uint64Val(c.Limits.TxnMaxReqLen),
UIConfig: b.uiConfigVal(c.UIConfig),
UnixSocketGroup: stringVal(c.UnixSocket.Group),
UnixSocketMode: stringVal(c.UnixSocket.Mode),
UnixSocketUser: stringVal(c.UnixSocket.User),
Watches: c.Watches,
AutoReloadConfigCoalesceInterval: 1 * time.Second,
}
rt.TLS, err = b.buildTLSConfig(rt, c.TLS)

View File

@ -44,7 +44,7 @@ type watchedFile struct {
}
type FileWatcherEvent struct {
Filename string
Filenames []string
}
//NewFileWatcher create a file watcher that will watch all the files/folders from configFiles
@ -213,7 +213,7 @@ func (w *fileWatcher) handleEvent(ctx context.Context, event fsnotify.Event) err
if isCreateEvent(event) || isWriteEvent(event) || isRenameEvent(event) {
w.logger.Trace("call the handler", "filename", event.Name, "OP", event.Op)
select {
case w.eventsCh <- &FileWatcherEvent{Filename: filename}:
case w.eventsCh <- &FileWatcherEvent{Filenames: []string{filename}}:
case <-ctx.Done():
return ctx.Err()
}
@ -265,7 +265,7 @@ func (w *fileWatcher) reconcile(ctx context.Context) {
w.logger.Trace("call the handler", "filename", filename, "old modTime", configFile.modTime, "new modTime", newModTime)
configFile.modTime = newModTime
select {
case w.eventsCh <- &FileWatcherEvent{Filename: filename}:
case w.eventsCh <- &FileWatcherEvent{Filenames: []string{filename}}:
case <-ctx.Done():
return
}

View File

@ -64,6 +64,23 @@ func TestWatcherAddRemove(t *testing.T) {
}
func TestWatcherReplace(t *testing.T) {
var filepaths []string
wi, err := NewFileWatcher(filepaths, hclog.New(&hclog.LoggerOptions{}))
w := wi.(*fileWatcher)
require.NoError(t, err)
file1 := createTempConfigFile(t, "temp_config1")
err = w.Add(file1)
require.NoError(t, err)
file2 := createTempConfigFile(t, "temp_config2")
err = w.Replace(file1, file2)
require.NoError(t, err)
_, ok := w.configFiles[file1]
require.False(t, ok)
_, ok = w.configFiles[file2]
require.True(t, ok)
}
func TestWatcherAddWhileRunning(t *testing.T) {
var filepaths []string
wi, err := NewFileWatcher(filepaths, hclog.New(&hclog.LoggerOptions{}))
@ -364,8 +381,8 @@ func TestEventWatcherMoveSoftLink(t *testing.T) {
func assertEvent(name string, watcherCh chan *FileWatcherEvent, timeout time.Duration) error {
select {
case ev := <-watcherCh:
if ev.Filename != name && !strings.Contains(ev.Filename, name) {
return fmt.Errorf("filename do not match %s %s", ev.Filename, name)
if ev.Filenames[0] != name && !strings.Contains(ev.Filenames[0], name) {
return fmt.Errorf("filename do not match %s %s", ev.Filenames[0], name)
}
return nil
case <-time.After(timeout):

View File

@ -0,0 +1,90 @@
package config
import (
"context"
"time"
"github.com/hashicorp/go-hclog"
)
type rateLimitedFileWatcher struct {
watcher Watcher
eventCh chan *FileWatcherEvent
coalesceInterval time.Duration
}
func (r *rateLimitedFileWatcher) Start(ctx context.Context) {
r.watcher.Start(ctx)
r.coalesceTimer(ctx, r.watcher.EventsCh(), r.coalesceInterval)
}
func (r rateLimitedFileWatcher) Stop() error {
return r.watcher.Stop()
}
func (r rateLimitedFileWatcher) Add(filename string) error {
return r.watcher.Add(filename)
}
func (r rateLimitedFileWatcher) Remove(filename string) {
r.watcher.Remove(filename)
}
func (r rateLimitedFileWatcher) Replace(oldFile, newFile string) error {
return r.watcher.Replace(oldFile, newFile)
}
func (r rateLimitedFileWatcher) EventsCh() chan *FileWatcherEvent {
return r.eventCh
}
func NewRateLimitedFileWatcher(configFiles []string, logger hclog.Logger, coalesceInterval time.Duration) (Watcher, error) {
watcher, err := NewFileWatcher(configFiles, logger)
if err != nil {
return nil, err
}
return &rateLimitedFileWatcher{
watcher: watcher,
coalesceInterval: coalesceInterval,
eventCh: make(chan *FileWatcherEvent),
}, nil
}
func (r rateLimitedFileWatcher) coalesceTimer(ctx context.Context, inputCh chan *FileWatcherEvent, coalesceDuration time.Duration) {
var (
coalesceTimer *time.Timer
sendCh = make(chan struct{})
fileWatcherEvents []string
)
go func() {
for {
select {
case event, ok := <-inputCh:
if !ok {
if len(fileWatcherEvents) > 0 {
r.eventCh <- &FileWatcherEvent{Filenames: fileWatcherEvents}
}
close(r.eventCh)
return
}
fileWatcherEvents = append(fileWatcherEvents, event.Filenames...)
if coalesceTimer == nil {
coalesceTimer = time.AfterFunc(coalesceDuration, func() {
// This runs in another goroutine so we can't just do the send
// directly here as access to fileWatcherEvents is racy. Instead,
// signal the main loop above.
sendCh <- struct{}{}
})
}
case <-sendCh:
coalesceTimer = nil
r.eventCh <- &FileWatcherEvent{Filenames: fileWatcherEvents}
fileWatcherEvents = make([]string, 0)
case <-ctx.Done():
return
}
}
}()
}

View File

@ -0,0 +1,91 @@
package config
import (
"context"
"os"
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/require"
)
func TestNewRateLimitedWatcher(t *testing.T) {
w, err := NewRateLimitedFileWatcher([]string{}, hclog.New(&hclog.LoggerOptions{}), 1*time.Nanosecond)
require.NoError(t, err)
require.NotNil(t, w)
}
func TestRateLimitedWatcherRenameEvent(t *testing.T) {
fileTmp := createTempConfigFile(t, "temp_config3")
filepaths := []string{createTempConfigFile(t, "temp_config1"), createTempConfigFile(t, "temp_config2")}
w, err := NewRateLimitedFileWatcher(filepaths, hclog.New(&hclog.LoggerOptions{}), 1*time.Nanosecond)
require.NoError(t, err)
w.Start(context.Background())
defer func() {
_ = w.Stop()
}()
require.NoError(t, err)
err = os.Rename(fileTmp, filepaths[0])
time.Sleep(timeoutDuration + 50*time.Millisecond)
require.NoError(t, err)
require.NoError(t, assertEvent(filepaths[0], w.EventsCh(), defaultTimeout))
// make sure we consume all events
_ = assertEvent(filepaths[0], w.EventsCh(), defaultTimeout)
}
func TestRateLimitedWatcherAddNotExist(t *testing.T) {
file := testutil.TempFile(t, "temp_config")
filename := file.Name() + randomStr(16)
w, err := NewRateLimitedFileWatcher([]string{filename}, hclog.New(&hclog.LoggerOptions{}), 1*time.Nanosecond)
require.Error(t, err, "no such file or directory")
require.Nil(t, w)
}
func TestEventRateLimitedWatcherWrite(t *testing.T) {
file := testutil.TempFile(t, "temp_config")
_, err := file.WriteString("test config")
require.NoError(t, err)
err = file.Sync()
require.NoError(t, err)
w, err := NewRateLimitedFileWatcher([]string{file.Name()}, hclog.New(&hclog.LoggerOptions{}), 1*time.Nanosecond)
require.NoError(t, err)
w.Start(context.Background())
defer func() {
_ = w.Stop()
}()
_, err = file.WriteString("test config 2")
require.NoError(t, err)
err = file.Sync()
require.NoError(t, err)
require.NoError(t, assertEvent(file.Name(), w.EventsCh(), defaultTimeout))
}
func TestEventRateLimitedWatcherMove(t *testing.T) {
filepath := createTempConfigFile(t, "temp_config1")
w, err := NewRateLimitedFileWatcher([]string{filepath}, hclog.New(&hclog.LoggerOptions{}), 1*time.Second)
require.NoError(t, err)
w.Start(context.Background())
defer func() {
_ = w.Stop()
}()
for i := 0; i < 10; i++ {
filepath2 := createTempConfigFile(t, "temp_config2")
err = os.Rename(filepath2, filepath)
time.Sleep(timeoutDuration + 50*time.Millisecond)
require.NoError(t, err)
}
require.NoError(t, assertEvent(filepath, w.EventsCh(), defaultTimeout))
require.Error(t, assertEvent(filepath, w.EventsCh(), defaultTimeout), "expected timeout error")
}

View File

@ -1399,6 +1399,9 @@ type RuntimeConfig struct {
//
Watches []map[string]interface{}
// AutoReloadConfigCoalesceInterval Coalesce Interval for auto reload config
AutoReloadConfigCoalesceInterval time.Duration
EnterpriseRuntimeConfig
}

View File

@ -6390,7 +6390,8 @@ func TestLoad_FullConfig(t *testing.T) {
"args": []interface{}{"dltjDJ2a", "flEa7C2d"},
},
},
RaftBoltDBConfig: consul.RaftBoltDBConfig{NoFreelistSync: true},
RaftBoltDBConfig: consul.RaftBoltDBConfig{NoFreelistSync: true},
AutoReloadConfigCoalesceInterval: 1 * time.Second,
}
entFullRuntimeConfig(expected)

View File

@ -64,6 +64,7 @@
"AutoEncryptIPSAN": [],
"AutoEncryptTLS": false,
"AutoReloadConfig": false,
"AutoReloadConfigCoalesceInterval": "0s",
"AutopilotCleanupDeadServers": false,
"AutopilotDisableUpgradeMigration": false,
"AutopilotLastContactThreshold": "0s",
@ -456,4 +457,4 @@
"Version": "",
"VersionPrerelease": "",
"Watches": []
}
}

View File

@ -221,6 +221,9 @@ func (a *TestAgent) Start(t *testing.T) error {
bd.MetricsHandler = metrics.NewInmemSink(1*time.Second, time.Minute)
}
if a.Config != nil && bd.RuntimeConfig.AutoReloadConfigCoalesceInterval == 0 {
bd.RuntimeConfig.AutoReloadConfigCoalesceInterval = a.Config.AutoReloadConfigCoalesceInterval
}
a.Config = bd.RuntimeConfig
agent, err := New(bd)