From c99e6e082c28d04033ca3435b6ede8977eab15c1 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 9 Jun 2014 12:13:14 -0700 Subject: [PATCH 1/6] agent: Adding new CheckUpdateInterval config --- command/agent/config.go | 24 +++++++++++++++++++-- command/agent/config_test.go | 42 +++++++++++++++++++++++------------- 2 files changed, 49 insertions(+), 17 deletions(-) diff --git a/command/agent/config.go b/command/agent/config.go index 4d038df89..f7f296a54 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -173,6 +173,14 @@ type Config struct { // true, we ignore the leave, and rejoin the cluster on start. RejoinAfterLeave bool `mapstructure:"rejoin_after_leave"` + // CheckUpdateInterval controls the interval on which the output of a health check + // is updated if there is no change to the state. For example, a check in a steady + // state may run every 5 second generating a unique output (timestamp, etc), forcing + // constant writes. This allows Consul to defer the write for some period of time, + // reducing the write pressure when the state is steady. + CheckUpdateInterval time.Duration `mapstructure:"-"` + CheckUpdateIntervalRaw string `mapstructure:"check_update_interval" json:"-"` + // AEInterval controls the anti-entropy interval. This is how often // the agent attempts to reconcile it's local state with the server' // representation of our state. Defaults to every 60s. @@ -220,8 +228,9 @@ func DefaultConfig() *Config { DNSConfig: DNSConfig{ MaxStale: 5 * time.Second, }, - Protocol: consul.ProtocolVersionMax, - AEInterval: time.Minute, + Protocol: consul.ProtocolVersionMax, + CheckUpdateInterval: 5 * time.Minute, + AEInterval: time.Minute, } } @@ -309,6 +318,14 @@ func DecodeConfig(r io.Reader) (*Config, error) { } } + if raw := result.CheckUpdateIntervalRaw; raw != "" { + dur, err := time.ParseDuration(raw) + if err != nil { + return nil, fmt.Errorf("CheckUpdateInterval invalid: %v", err) + } + result.CheckUpdateInterval = dur + } + return &result, nil } @@ -536,6 +553,9 @@ func MergeConfig(a, b *Config) *Config { if b.DNSConfig.MaxStale != 0 { result.DNSConfig.MaxStale = b.DNSConfig.MaxStale } + if b.CheckUpdateInterval != 0 { + result.CheckUpdateInterval = b.CheckUpdateInterval + } // Copy the start join addresses result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin)) diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 767b17cf5..b1e2ef4c2 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -324,6 +324,17 @@ func TestDecodeConfig(t *testing.T) { if config.DNSConfig.ServiceTTL["web"] != 30*time.Second { t.Fatalf("bad: %#v", config) } + + // CheckUpdateInterval + input = `{"check_update_interval": "10m"}` + config, err = DecodeConfig(bytes.NewReader([]byte(input))) + if err != nil { + t.Fatalf("err: %s", err) + } + + if config.CheckUpdateInterval != 10*time.Minute { + t.Fatalf("bad: %#v", config) + } } func TestDecodeConfig_Service(t *testing.T) { @@ -451,21 +462,22 @@ func TestMergeConfig(t *testing.T) { SerfWan: 5, Server: 6, }, - Server: true, - LeaveOnTerm: true, - SkipLeaveOnInt: true, - EnableDebug: true, - VerifyIncoming: true, - VerifyOutgoing: true, - CAFile: "test/ca.pem", - CertFile: "test/cert.pem", - KeyFile: "test/key.pem", - Checks: []*CheckDefinition{nil}, - Services: []*ServiceDefinition{nil}, - StartJoin: []string{"1.1.1.1"}, - UiDir: "/opt/consul-ui", - EnableSyslog: true, - RejoinAfterLeave: true, + Server: true, + LeaveOnTerm: true, + SkipLeaveOnInt: true, + EnableDebug: true, + VerifyIncoming: true, + VerifyOutgoing: true, + CAFile: "test/ca.pem", + CertFile: "test/cert.pem", + KeyFile: "test/key.pem", + Checks: []*CheckDefinition{nil}, + Services: []*ServiceDefinition{nil}, + StartJoin: []string{"1.1.1.1"}, + UiDir: "/opt/consul-ui", + EnableSyslog: true, + RejoinAfterLeave: true, + CheckUpdateInterval: 8 * time.Minute, } c := MergeConfig(a, b) From 99ac4dc1bb3c2d2186163a459bc261e24b4018d8 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 9 Jun 2014 12:46:10 -0700 Subject: [PATCH 2/6] agent: Allow CheckUpdateInterval to be zero --- command/agent/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/command/agent/config.go b/command/agent/config.go index f7f296a54..0abd04fa2 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -553,7 +553,7 @@ func MergeConfig(a, b *Config) *Config { if b.DNSConfig.MaxStale != 0 { result.DNSConfig.MaxStale = b.DNSConfig.MaxStale } - if b.CheckUpdateInterval != 0 { + if b.CheckUpdateIntervalRaw != "" { result.CheckUpdateInterval = b.CheckUpdateInterval } From 33c2132949e6728ddadeed3349036bcc8413e4db Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 9 Jun 2014 12:46:29 -0700 Subject: [PATCH 3/6] agent: Defer sync based on CheckUpdateInterval --- command/agent/local.go | 24 +++++++++++- command/agent/local_test.go | 74 +++++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 2 deletions(-) diff --git a/command/agent/local.go b/command/agent/local.go index db03930f8..d501554d7 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -18,8 +18,9 @@ const ( // syncStatus is used to represent the difference between // the local and remote state, and if action needs to be taken type syncStatus struct { - remoteDelete bool // Should this be deleted from the server - inSync bool // Is this in sync with the server + remoteDelete bool // Should this be deleted from the server + inSync bool // Is this in sync with the server + deferSync *time.Timer // Defer sync until this time } // localState is used to represent the node's services, @@ -191,6 +192,25 @@ func (l *localState) UpdateCheck(checkID, status, output string) { return } + // Defer a sync if the output has changed. This is an optimization around + // frequent updates of output. Instead, we update the output internally, + // and periodically do a write-back to the servers. If there is a status + // change we do the write immediately. + if l.config.CheckUpdateInterval > 0 && check.Status == status { + check.Output = output + status := l.checkStatus[checkID] + if status.deferSync == nil && status.inSync { + deferSync := time.AfterFunc(l.config.CheckUpdateInterval, func() { + l.Lock() + l.checkStatus[checkID] = syncStatus{inSync: false} + l.changeMade() + l.Unlock() + }) + l.checkStatus[checkID] = syncStatus{deferSync: deferSync} + } + return + } + // Update status and mark out of sync check.Status = status check.Output = output diff --git a/command/agent/local_test.go b/command/agent/local_test.go index 2cc5d9da8..658d8ca82 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -252,3 +252,77 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { } } } + +func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { + conf := nextConfig() + conf.CheckUpdateInterval = 100 * time.Millisecond + dir, agent := makeAgent(t, conf) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + testutil.WaitForLeader(t, agent.RPC, "dc1") + + // Create a check + check := &structs.HealthCheck{ + Node: agent.config.NodeName, + CheckID: "web", + Name: "web", + Status: structs.HealthPassing, + Output: "", + } + agent.state.AddCheck(check) + + // Trigger anti-entropy run and wait + agent.StartSync() + time.Sleep(200 * time.Millisecond) + + // Verify that we are in sync + req := structs.NodeSpecificRequest{ + Datacenter: "dc1", + Node: agent.config.NodeName, + } + var checks structs.IndexedHealthChecks + if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil { + t.Fatalf("err: %v", err) + } + + // Verify checks in place + if len(checks.HealthChecks) != 2 { + t.Fatalf("checks: %v", check) + } + + // Update the check output! Should be defered + agent.state.UpdateCheck("web", structs.HealthPassing, "output") + + // Should not update for 100 milliseconds + time.Sleep(50 * time.Millisecond) + if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil { + t.Fatalf("err: %v", err) + } + + // Verify not updated + for _, chk := range checks.HealthChecks { + switch chk.CheckID { + case "web": + if chk.Output != "" { + t.Fatalf("early update: %v", chk) + } + } + } + + // Wait for a defered update + time.Sleep(100 * time.Millisecond) + if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil { + t.Fatalf("err: %v", err) + } + + // Verify not updated + for _, chk := range checks.HealthChecks { + switch chk.CheckID { + case "web": + if chk.Output != "output" { + t.Fatalf("no update: %v", chk) + } + } + } +} From 2568918055155c8c80a0585442983ff1b4441d85 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 9 Jun 2014 12:51:12 -0700 Subject: [PATCH 4/6] website: Document the check_update_interval config --- website/source/docs/agent/options.html.markdown | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/website/source/docs/agent/options.html.markdown b/website/source/docs/agent/options.html.markdown index bc95aea78..e108549a8 100644 --- a/website/source/docs/agent/options.html.markdown +++ b/website/source/docs/agent/options.html.markdown @@ -179,6 +179,14 @@ definitions support being updated during a reload. The certificate is provided to clients or servers to verify the agents authenticity. Must be provided along with the `key_file`. +* `check_update_interval` - This interval controls how often check output from + checks in a steady state is syncronized with the server. By default, this is + set to 5 minutes ("5m"). Many checks which are in a steady state produce + slightly different output per run (timestamps, etc) which cause constant writes. + This configuration allows defering the sync of check output for a given interval to + reduce write pressure. If a check ever changes state, the new state and associated + output is syncronized immediately. To disable this behavior, set the value to "0s". + * `domain` - By default, Consul responds to DNS queries in the "consul." domain. This flag can be used to change that domain. All queries in this domain are assumed to be handled by Consul, and will not be recursively resolved. From bc9ea2af9a9c1b5f85527946300c37c3262fb316 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 9 Jun 2014 12:57:50 -0700 Subject: [PATCH 5/6] agent: leave inSync until the defer runs --- command/agent/local.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/command/agent/local.go b/command/agent/local.go index d501554d7..d1457c7e5 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -206,7 +206,7 @@ func (l *localState) UpdateCheck(checkID, status, output string) { l.changeMade() l.Unlock() }) - l.checkStatus[checkID] = syncStatus{deferSync: deferSync} + l.checkStatus[checkID] = syncStatus{inSync: true, deferSync: deferSync} } return } From ca0887b4018977e42918b5b62db66b0cf756c03c Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 9 Jun 2014 13:00:32 -0700 Subject: [PATCH 6/6] agent: Update config test to handle zero value CheckUpdateInterval --- command/agent/config_test.go | 62 +++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/command/agent/config_test.go b/command/agent/config_test.go index b1e2ef4c2..45da38354 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -419,20 +419,21 @@ func TestDecodeConfig_Check(t *testing.T) { func TestMergeConfig(t *testing.T) { a := &Config{ - Bootstrap: false, - Datacenter: "dc1", - DataDir: "/tmp/foo", - DNSRecursor: "127.0.0.1:1001", - Domain: "basic", - LogLevel: "debug", - NodeName: "foo", - ClientAddr: "127.0.0.1", - BindAddr: "127.0.0.1", - AdvertiseAddr: "127.0.0.1", - Server: false, - LeaveOnTerm: false, - SkipLeaveOnInt: false, - EnableDebug: false, + Bootstrap: false, + Datacenter: "dc1", + DataDir: "/tmp/foo", + DNSRecursor: "127.0.0.1:1001", + Domain: "basic", + LogLevel: "debug", + NodeName: "foo", + ClientAddr: "127.0.0.1", + BindAddr: "127.0.0.1", + AdvertiseAddr: "127.0.0.1", + Server: false, + LeaveOnTerm: false, + SkipLeaveOnInt: false, + EnableDebug: false, + CheckUpdateIntervalRaw: "8m", } b := &Config{ @@ -462,22 +463,23 @@ func TestMergeConfig(t *testing.T) { SerfWan: 5, Server: 6, }, - Server: true, - LeaveOnTerm: true, - SkipLeaveOnInt: true, - EnableDebug: true, - VerifyIncoming: true, - VerifyOutgoing: true, - CAFile: "test/ca.pem", - CertFile: "test/cert.pem", - KeyFile: "test/key.pem", - Checks: []*CheckDefinition{nil}, - Services: []*ServiceDefinition{nil}, - StartJoin: []string{"1.1.1.1"}, - UiDir: "/opt/consul-ui", - EnableSyslog: true, - RejoinAfterLeave: true, - CheckUpdateInterval: 8 * time.Minute, + Server: true, + LeaveOnTerm: true, + SkipLeaveOnInt: true, + EnableDebug: true, + VerifyIncoming: true, + VerifyOutgoing: true, + CAFile: "test/ca.pem", + CertFile: "test/cert.pem", + KeyFile: "test/key.pem", + Checks: []*CheckDefinition{nil}, + Services: []*ServiceDefinition{nil}, + StartJoin: []string{"1.1.1.1"}, + UiDir: "/opt/consul-ui", + EnableSyslog: true, + RejoinAfterLeave: true, + CheckUpdateInterval: 8 * time.Minute, + CheckUpdateIntervalRaw: "8m", } c := MergeConfig(a, b)