Merge pull request #2859 from hashicorp/f-heartbeat-tunables

Allow tuning of heartbeat ttls
This commit is contained in:
Alex Dadgar 2017-07-19 10:07:36 -07:00 committed by GitHub
commit 15912d29d2
8 changed files with 111 additions and 59 deletions

View file

@ -192,12 +192,14 @@ func convertServerConfig(agentConfig *Config, logOutput io.Writer) (*nomad.Confi
conf.DeploymentGCThreshold = dur conf.DeploymentGCThreshold = dur
} }
if heartbeatGrace := agentConfig.Server.HeartbeatGrace; heartbeatGrace != "" { if heartbeatGrace := agentConfig.Server.HeartbeatGrace; heartbeatGrace != 0 {
dur, err := time.ParseDuration(heartbeatGrace) conf.HeartbeatGrace = heartbeatGrace
if err != nil { }
return nil, err if min := agentConfig.Server.MinHeartbeatTTL; min != 0 {
} conf.MinHeartbeatTTL = min
conf.HeartbeatGrace = dur }
if maxHPS := agentConfig.Server.MaxHeartbeatsPerSecond; maxHPS != 0 {
conf.MaxHeartbeatsPerSecond = maxHPS
} }
if *agentConfig.Consul.AutoAdvertise && agentConfig.Consul.ServerServiceName == "" { if *agentConfig.Consul.AutoAdvertise && agentConfig.Consul.ServerServiceName == "" {

View file

@ -233,24 +233,24 @@ func TestAgent_ServerConfig(t *testing.T) {
t.Fatalf("expect 10s, got: %s", threshold) t.Fatalf("expect 10s, got: %s", threshold)
} }
conf.Server.HeartbeatGrace = "42g" conf.Server.HeartbeatGrace = 37 * time.Second
if err := conf.normalizeAddrs(); err != nil {
t.Fatalf("error normalizing config: %v", err)
}
out, err = a.serverConfig()
if err == nil || !strings.Contains(err.Error(), "unknown unit") {
t.Fatalf("expected unknown unit error, got: %#v", err)
}
conf.Server.HeartbeatGrace = "37s"
if err := conf.normalizeAddrs(); err != nil {
t.Fatalf("error normalizing config: %v", err)
}
out, err = a.serverConfig() out, err = a.serverConfig()
if threshold := out.HeartbeatGrace; threshold != time.Second*37 { if threshold := out.HeartbeatGrace; threshold != time.Second*37 {
t.Fatalf("expect 37s, got: %s", threshold) t.Fatalf("expect 37s, got: %s", threshold)
} }
conf.Server.MinHeartbeatTTL = 37 * time.Second
out, err = a.serverConfig()
if min := out.MinHeartbeatTTL; min != time.Second*37 {
t.Fatalf("expect 37s, got: %s", min)
}
conf.Server.MaxHeartbeatsPerSecond = 11.0
out, err = a.serverConfig()
if max := out.MaxHeartbeatsPerSecond; max != 11.0 {
t.Fatalf("expect 11, got: %s", max)
}
// Defaults to the global bind addr // Defaults to the global bind addr
conf.Addresses.RPC = "" conf.Addresses.RPC = ""
conf.Addresses.Serf = "" conf.Addresses.Serf = ""

View file

@ -73,6 +73,8 @@ server {
eval_gc_threshold = "12h" eval_gc_threshold = "12h"
deployment_gc_threshold = "12h" deployment_gc_threshold = "12h"
heartbeat_grace = "30s" heartbeat_grace = "30s"
min_heartbeat_ttl = "33s"
max_heartbeats_per_second = 11.0
retry_join = [ "1.1.1.1", "2.2.2.2" ] retry_join = [ "1.1.1.1", "2.2.2.2" ]
start_join = [ "1.1.1.1", "2.2.2.2" ] start_join = [ "1.1.1.1", "2.2.2.2" ]
retry_max = 3 retry_max = 3

View file

@ -277,7 +277,16 @@ type ServerConfig struct {
// HeartbeatGrace is the grace period beyond the TTL to account for network, // HeartbeatGrace is the grace period beyond the TTL to account for network,
// processing delays and clock skew before marking a node as "down". // processing delays and clock skew before marking a node as "down".
HeartbeatGrace string `mapstructure:"heartbeat_grace"` HeartbeatGrace time.Duration `mapstructure:"heartbeat_grace"`
// MinHeartbeatTTL is the minimum time between heartbeats. This is used as
// a floor to prevent excessive updates.
MinHeartbeatTTL time.Duration `mapstructure:"min_heartbeat_ttl"`
// MaxHeartbeatsPerSecond is the maximum target rate of heartbeats
// being processed per second. This allows the TTL to be increased
// to meet the target rate.
MaxHeartbeatsPerSecond float64 `mapstructure:"max_heartbeats_per_second"`
// StartJoin is a list of addresses to attempt to join when the // StartJoin is a list of addresses to attempt to join when the
// agent starts. If Serf is unable to communicate with any of these // agent starts. If Serf is unable to communicate with any of these
@ -924,9 +933,15 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.DeploymentGCThreshold != "" { if b.DeploymentGCThreshold != "" {
result.DeploymentGCThreshold = b.DeploymentGCThreshold result.DeploymentGCThreshold = b.DeploymentGCThreshold
} }
if b.HeartbeatGrace != "" { if b.HeartbeatGrace != 0 {
result.HeartbeatGrace = b.HeartbeatGrace result.HeartbeatGrace = b.HeartbeatGrace
} }
if b.MinHeartbeatTTL != 0 {
result.MinHeartbeatTTL = b.MinHeartbeatTTL
}
if b.MaxHeartbeatsPerSecond != 0.0 {
result.MaxHeartbeatsPerSecond = b.MaxHeartbeatsPerSecond
}
if b.RetryMaxAttempts != 0 { if b.RetryMaxAttempts != 0 {
result.RetryMaxAttempts = b.RetryMaxAttempts result.RetryMaxAttempts = b.RetryMaxAttempts
} }

View file

@ -506,6 +506,8 @@ func parseServer(result **ServerConfig, list *ast.ObjectList) error {
"job_gc_threshold", "job_gc_threshold",
"deployment_gc_threshold", "deployment_gc_threshold",
"heartbeat_grace", "heartbeat_grace",
"min_heartbeat_ttl",
"max_heartbeats_per_second",
"start_join", "start_join",
"retry_join", "retry_join",
"retry_max", "retry_max",
@ -523,7 +525,15 @@ func parseServer(result **ServerConfig, list *ast.ObjectList) error {
} }
var config ServerConfig var config ServerConfig
if err := mapstructure.WeakDecode(m, &config); err != nil { dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: &config,
})
if err != nil {
return err
}
if err := dec.Decode(m); err != nil {
return err return err
} }

View file

@ -82,23 +82,25 @@ func TestConfig_Parse(t *testing.T) {
NoHostUUID: helper.BoolToPtr(false), NoHostUUID: helper.BoolToPtr(false),
}, },
Server: &ServerConfig{ Server: &ServerConfig{
Enabled: true, Enabled: true,
BootstrapExpect: 5, BootstrapExpect: 5,
DataDir: "/tmp/data", DataDir: "/tmp/data",
ProtocolVersion: 3, ProtocolVersion: 3,
NumSchedulers: 2, NumSchedulers: 2,
EnabledSchedulers: []string{"test"}, EnabledSchedulers: []string{"test"},
NodeGCThreshold: "12h", NodeGCThreshold: "12h",
EvalGCThreshold: "12h", EvalGCThreshold: "12h",
JobGCThreshold: "12h", JobGCThreshold: "12h",
DeploymentGCThreshold: "12h", DeploymentGCThreshold: "12h",
HeartbeatGrace: "30s", HeartbeatGrace: 30 * time.Second,
RetryJoin: []string{"1.1.1.1", "2.2.2.2"}, MinHeartbeatTTL: 33 * time.Second,
StartJoin: []string{"1.1.1.1", "2.2.2.2"}, MaxHeartbeatsPerSecond: 11.0,
RetryInterval: "15s", RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
RejoinAfterLeave: true, StartJoin: []string{"1.1.1.1", "2.2.2.2"},
RetryMaxAttempts: 3, RetryInterval: "15s",
EncryptKey: "abc", RejoinAfterLeave: true,
RetryMaxAttempts: 3,
EncryptKey: "abc",
}, },
Telemetry: &Telemetry{ Telemetry: &Telemetry{
StatsiteAddr: "127.0.0.1:1234", StatsiteAddr: "127.0.0.1:1234",

View file

@ -90,13 +90,15 @@ func TestConfig_Merge(t *testing.T) {
}, },
}, },
Server: &ServerConfig{ Server: &ServerConfig{
Enabled: false, Enabled: false,
BootstrapExpect: 1, BootstrapExpect: 1,
DataDir: "/tmp/data1", DataDir: "/tmp/data1",
ProtocolVersion: 1, ProtocolVersion: 1,
NumSchedulers: 1, NumSchedulers: 1,
NodeGCThreshold: "1h", NodeGCThreshold: "1h",
HeartbeatGrace: "30s", HeartbeatGrace: 30 * time.Second,
MinHeartbeatTTL: 30 * time.Second,
MaxHeartbeatsPerSecond: 30.0,
}, },
Ports: &Ports{ Ports: &Ports{
HTTP: 4646, HTTP: 4646,
@ -220,19 +222,21 @@ func TestConfig_Merge(t *testing.T) {
GCInodeUsageThreshold: 86, GCInodeUsageThreshold: 86,
}, },
Server: &ServerConfig{ Server: &ServerConfig{
Enabled: true, Enabled: true,
BootstrapExpect: 2, BootstrapExpect: 2,
DataDir: "/tmp/data2", DataDir: "/tmp/data2",
ProtocolVersion: 2, ProtocolVersion: 2,
NumSchedulers: 2, NumSchedulers: 2,
EnabledSchedulers: []string{structs.JobTypeBatch}, EnabledSchedulers: []string{structs.JobTypeBatch},
NodeGCThreshold: "12h", NodeGCThreshold: "12h",
HeartbeatGrace: "2m", HeartbeatGrace: 2 * time.Minute,
RejoinAfterLeave: true, MinHeartbeatTTL: 2 * time.Minute,
StartJoin: []string{"1.1.1.1"}, MaxHeartbeatsPerSecond: 200.0,
RetryJoin: []string{"1.1.1.1"}, RejoinAfterLeave: true,
RetryInterval: "10s", StartJoin: []string{"1.1.1.1"},
retryInterval: time.Second * 10, RetryJoin: []string{"1.1.1.1"},
RetryInterval: "10s",
retryInterval: time.Second * 10,
}, },
Ports: &Ports{ Ports: &Ports{
HTTP: 20000, HTTP: 20000,

View file

@ -80,6 +80,23 @@ server {
deployment must be in the terminal state before it is eligible for garbage deployment must be in the terminal state before it is eligible for garbage
collection. This is specified using a label suffix like "30s" or "1h". collection. This is specified using a label suffix like "30s" or "1h".
- `heartbeat_grace` `(string: "10s")` - Specifies the additional time given as a
grace period beyond the heartbeat TTL of nodes to account for network and
processing delays as well as clock skew. This is specified using a label
suffix like "30s" or "1h".
- `min_heartbeat_ttl` `(string: "10s")` - Specifies the minimum time between
node heartbeats. This is used as a floor to prevent excessive updates. This is
specified using a label suffix like "30s" or "1h". Lowering the minimum TTL is
a tradeoff as it lowers failure detection time of nodes at the tradeoff of
false positives and increased load on the leader.
- `max_heartbeats_per_second` `(float: 50.0)` - Specifies the maximum target
rate of heartbeats being processed per second. This allows the TTL to be
increased to meet the target rate. Increasing the maximum heartbeats per
second is a tradeoff as it lowers failure detection time of nodes at the
tradeoff of false positives and increased load on the leader.
- `num_schedulers` `(int: [num-cores])` - Specifies the number of parallel - `num_schedulers` `(int: [num-cores])` - Specifies the number of parallel
scheduler threads to run. This can be as many as one per core, or `0` to scheduler threads to run. This can be as many as one per core, or `0` to
disallow this server from making any scheduling decisions. This defaults to disallow this server from making any scheduling decisions. This defaults to