Merge pull request #204 from hashicorp/f-check-freq
Add deferred sync for check output
This commit is contained in:
commit
8c6c8e1f6c
|
@ -173,6 +173,14 @@ type Config struct {
|
||||||
// true, we ignore the leave, and rejoin the cluster on start.
|
// true, we ignore the leave, and rejoin the cluster on start.
|
||||||
RejoinAfterLeave bool `mapstructure:"rejoin_after_leave"`
|
RejoinAfterLeave bool `mapstructure:"rejoin_after_leave"`
|
||||||
|
|
||||||
|
// CheckUpdateInterval controls the interval on which the output of a health check
|
||||||
|
// is updated if there is no change to the state. For example, a check in a steady
|
||||||
|
// state may run every 5 second generating a unique output (timestamp, etc), forcing
|
||||||
|
// constant writes. This allows Consul to defer the write for some period of time,
|
||||||
|
// reducing the write pressure when the state is steady.
|
||||||
|
CheckUpdateInterval time.Duration `mapstructure:"-"`
|
||||||
|
CheckUpdateIntervalRaw string `mapstructure:"check_update_interval" json:"-"`
|
||||||
|
|
||||||
// AEInterval controls the anti-entropy interval. This is how often
|
// AEInterval controls the anti-entropy interval. This is how often
|
||||||
// the agent attempts to reconcile it's local state with the server'
|
// the agent attempts to reconcile it's local state with the server'
|
||||||
// representation of our state. Defaults to every 60s.
|
// representation of our state. Defaults to every 60s.
|
||||||
|
@ -220,8 +228,9 @@ func DefaultConfig() *Config {
|
||||||
DNSConfig: DNSConfig{
|
DNSConfig: DNSConfig{
|
||||||
MaxStale: 5 * time.Second,
|
MaxStale: 5 * time.Second,
|
||||||
},
|
},
|
||||||
Protocol: consul.ProtocolVersionMax,
|
Protocol: consul.ProtocolVersionMax,
|
||||||
AEInterval: time.Minute,
|
CheckUpdateInterval: 5 * time.Minute,
|
||||||
|
AEInterval: time.Minute,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -309,6 +318,14 @@ func DecodeConfig(r io.Reader) (*Config, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if raw := result.CheckUpdateIntervalRaw; raw != "" {
|
||||||
|
dur, err := time.ParseDuration(raw)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("CheckUpdateInterval invalid: %v", err)
|
||||||
|
}
|
||||||
|
result.CheckUpdateInterval = dur
|
||||||
|
}
|
||||||
|
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -536,6 +553,9 @@ func MergeConfig(a, b *Config) *Config {
|
||||||
if b.DNSConfig.MaxStale != 0 {
|
if b.DNSConfig.MaxStale != 0 {
|
||||||
result.DNSConfig.MaxStale = b.DNSConfig.MaxStale
|
result.DNSConfig.MaxStale = b.DNSConfig.MaxStale
|
||||||
}
|
}
|
||||||
|
if b.CheckUpdateIntervalRaw != "" {
|
||||||
|
result.CheckUpdateInterval = b.CheckUpdateInterval
|
||||||
|
}
|
||||||
|
|
||||||
// Copy the start join addresses
|
// Copy the start join addresses
|
||||||
result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin))
|
result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin))
|
||||||
|
|
|
@ -324,6 +324,17 @@ func TestDecodeConfig(t *testing.T) {
|
||||||
if config.DNSConfig.ServiceTTL["web"] != 30*time.Second {
|
if config.DNSConfig.ServiceTTL["web"] != 30*time.Second {
|
||||||
t.Fatalf("bad: %#v", config)
|
t.Fatalf("bad: %#v", config)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CheckUpdateInterval
|
||||||
|
input = `{"check_update_interval": "10m"}`
|
||||||
|
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.CheckUpdateInterval != 10*time.Minute {
|
||||||
|
t.Fatalf("bad: %#v", config)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDecodeConfig_Service(t *testing.T) {
|
func TestDecodeConfig_Service(t *testing.T) {
|
||||||
|
@ -408,20 +419,21 @@ func TestDecodeConfig_Check(t *testing.T) {
|
||||||
|
|
||||||
func TestMergeConfig(t *testing.T) {
|
func TestMergeConfig(t *testing.T) {
|
||||||
a := &Config{
|
a := &Config{
|
||||||
Bootstrap: false,
|
Bootstrap: false,
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
DataDir: "/tmp/foo",
|
DataDir: "/tmp/foo",
|
||||||
DNSRecursor: "127.0.0.1:1001",
|
DNSRecursor: "127.0.0.1:1001",
|
||||||
Domain: "basic",
|
Domain: "basic",
|
||||||
LogLevel: "debug",
|
LogLevel: "debug",
|
||||||
NodeName: "foo",
|
NodeName: "foo",
|
||||||
ClientAddr: "127.0.0.1",
|
ClientAddr: "127.0.0.1",
|
||||||
BindAddr: "127.0.0.1",
|
BindAddr: "127.0.0.1",
|
||||||
AdvertiseAddr: "127.0.0.1",
|
AdvertiseAddr: "127.0.0.1",
|
||||||
Server: false,
|
Server: false,
|
||||||
LeaveOnTerm: false,
|
LeaveOnTerm: false,
|
||||||
SkipLeaveOnInt: false,
|
SkipLeaveOnInt: false,
|
||||||
EnableDebug: false,
|
EnableDebug: false,
|
||||||
|
CheckUpdateIntervalRaw: "8m",
|
||||||
}
|
}
|
||||||
|
|
||||||
b := &Config{
|
b := &Config{
|
||||||
|
@ -451,21 +463,23 @@ func TestMergeConfig(t *testing.T) {
|
||||||
SerfWan: 5,
|
SerfWan: 5,
|
||||||
Server: 6,
|
Server: 6,
|
||||||
},
|
},
|
||||||
Server: true,
|
Server: true,
|
||||||
LeaveOnTerm: true,
|
LeaveOnTerm: true,
|
||||||
SkipLeaveOnInt: true,
|
SkipLeaveOnInt: true,
|
||||||
EnableDebug: true,
|
EnableDebug: true,
|
||||||
VerifyIncoming: true,
|
VerifyIncoming: true,
|
||||||
VerifyOutgoing: true,
|
VerifyOutgoing: true,
|
||||||
CAFile: "test/ca.pem",
|
CAFile: "test/ca.pem",
|
||||||
CertFile: "test/cert.pem",
|
CertFile: "test/cert.pem",
|
||||||
KeyFile: "test/key.pem",
|
KeyFile: "test/key.pem",
|
||||||
Checks: []*CheckDefinition{nil},
|
Checks: []*CheckDefinition{nil},
|
||||||
Services: []*ServiceDefinition{nil},
|
Services: []*ServiceDefinition{nil},
|
||||||
StartJoin: []string{"1.1.1.1"},
|
StartJoin: []string{"1.1.1.1"},
|
||||||
UiDir: "/opt/consul-ui",
|
UiDir: "/opt/consul-ui",
|
||||||
EnableSyslog: true,
|
EnableSyslog: true,
|
||||||
RejoinAfterLeave: true,
|
RejoinAfterLeave: true,
|
||||||
|
CheckUpdateInterval: 8 * time.Minute,
|
||||||
|
CheckUpdateIntervalRaw: "8m",
|
||||||
}
|
}
|
||||||
|
|
||||||
c := MergeConfig(a, b)
|
c := MergeConfig(a, b)
|
||||||
|
|
|
@ -18,8 +18,9 @@ const (
|
||||||
// syncStatus is used to represent the difference between
|
// syncStatus is used to represent the difference between
|
||||||
// the local and remote state, and if action needs to be taken
|
// the local and remote state, and if action needs to be taken
|
||||||
type syncStatus struct {
|
type syncStatus struct {
|
||||||
remoteDelete bool // Should this be deleted from the server
|
remoteDelete bool // Should this be deleted from the server
|
||||||
inSync bool // Is this in sync with the server
|
inSync bool // Is this in sync with the server
|
||||||
|
deferSync *time.Timer // Defer sync until this time
|
||||||
}
|
}
|
||||||
|
|
||||||
// localState is used to represent the node's services,
|
// localState is used to represent the node's services,
|
||||||
|
@ -191,6 +192,25 @@ func (l *localState) UpdateCheck(checkID, status, output string) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Defer a sync if the output has changed. This is an optimization around
|
||||||
|
// frequent updates of output. Instead, we update the output internally,
|
||||||
|
// and periodically do a write-back to the servers. If there is a status
|
||||||
|
// change we do the write immediately.
|
||||||
|
if l.config.CheckUpdateInterval > 0 && check.Status == status {
|
||||||
|
check.Output = output
|
||||||
|
status := l.checkStatus[checkID]
|
||||||
|
if status.deferSync == nil && status.inSync {
|
||||||
|
deferSync := time.AfterFunc(l.config.CheckUpdateInterval, func() {
|
||||||
|
l.Lock()
|
||||||
|
l.checkStatus[checkID] = syncStatus{inSync: false}
|
||||||
|
l.changeMade()
|
||||||
|
l.Unlock()
|
||||||
|
})
|
||||||
|
l.checkStatus[checkID] = syncStatus{inSync: true, deferSync: deferSync}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Update status and mark out of sync
|
// Update status and mark out of sync
|
||||||
check.Status = status
|
check.Status = status
|
||||||
check.Output = output
|
check.Output = output
|
||||||
|
|
|
@ -252,3 +252,77 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
||||||
|
conf := nextConfig()
|
||||||
|
conf.CheckUpdateInterval = 100 * time.Millisecond
|
||||||
|
dir, agent := makeAgent(t, conf)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
defer agent.Shutdown()
|
||||||
|
|
||||||
|
testutil.WaitForLeader(t, agent.RPC, "dc1")
|
||||||
|
|
||||||
|
// Create a check
|
||||||
|
check := &structs.HealthCheck{
|
||||||
|
Node: agent.config.NodeName,
|
||||||
|
CheckID: "web",
|
||||||
|
Name: "web",
|
||||||
|
Status: structs.HealthPassing,
|
||||||
|
Output: "",
|
||||||
|
}
|
||||||
|
agent.state.AddCheck(check)
|
||||||
|
|
||||||
|
// Trigger anti-entropy run and wait
|
||||||
|
agent.StartSync()
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
|
// Verify that we are in sync
|
||||||
|
req := structs.NodeSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: agent.config.NodeName,
|
||||||
|
}
|
||||||
|
var checks structs.IndexedHealthChecks
|
||||||
|
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify checks in place
|
||||||
|
if len(checks.HealthChecks) != 2 {
|
||||||
|
t.Fatalf("checks: %v", check)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the check output! Should be defered
|
||||||
|
agent.state.UpdateCheck("web", structs.HealthPassing, "output")
|
||||||
|
|
||||||
|
// Should not update for 100 milliseconds
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify not updated
|
||||||
|
for _, chk := range checks.HealthChecks {
|
||||||
|
switch chk.CheckID {
|
||||||
|
case "web":
|
||||||
|
if chk.Output != "" {
|
||||||
|
t.Fatalf("early update: %v", chk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for a defered update
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify not updated
|
||||||
|
for _, chk := range checks.HealthChecks {
|
||||||
|
switch chk.CheckID {
|
||||||
|
case "web":
|
||||||
|
if chk.Output != "output" {
|
||||||
|
t.Fatalf("no update: %v", chk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -179,6 +179,14 @@ definitions support being updated during a reload.
|
||||||
The certificate is provided to clients or servers to verify the agents authenticity.
|
The certificate is provided to clients or servers to verify the agents authenticity.
|
||||||
Must be provided along with the `key_file`.
|
Must be provided along with the `key_file`.
|
||||||
|
|
||||||
|
* `check_update_interval` - This interval controls how often check output from
|
||||||
|
checks in a steady state is syncronized with the server. By default, this is
|
||||||
|
set to 5 minutes ("5m"). Many checks which are in a steady state produce
|
||||||
|
slightly different output per run (timestamps, etc) which cause constant writes.
|
||||||
|
This configuration allows defering the sync of check output for a given interval to
|
||||||
|
reduce write pressure. If a check ever changes state, the new state and associated
|
||||||
|
output is syncronized immediately. To disable this behavior, set the value to "0s".
|
||||||
|
|
||||||
* `domain` - By default, Consul responds to DNS queries in the "consul." domain.
|
* `domain` - By default, Consul responds to DNS queries in the "consul." domain.
|
||||||
This flag can be used to change that domain. All queries in this domain are assumed
|
This flag can be used to change that domain. All queries in this domain are assumed
|
||||||
to be handled by Consul, and will not be recursively resolved.
|
to be handled by Consul, and will not be recursively resolved.
|
||||||
|
|
Loading…
Reference in New Issue