From 524a1f0712636a1afcbc639532b66d50ffe2d309 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 30 Oct 2017 12:19:11 -0700 Subject: [PATCH] Publishing metrics for job summary --- command/agent/agent.go | 3 +++ nomad/config.go | 4 ++++ nomad/leader.go | 49 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+) diff --git a/command/agent/agent.go b/command/agent/agent.go index 0c8072629..e47e8be3e 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -233,6 +233,9 @@ func convertServerConfig(agentConfig *Config, logOutput io.Writer) (*nomad.Confi // Set the TLS config conf.TLSConfig = agentConfig.TLSConfig + // Setup telemetry related config + conf.StatsCollectionInterval = agentConfig.Telemetry.collectionInterval + return conf, nil } diff --git a/nomad/config.go b/nomad/config.go index db4939106..fecac1dfa 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -245,6 +245,10 @@ type Config struct { // SentinelConfig is this Agent's Sentinel configuration SentinelConfig *config.SentinelConfig + + // StatsCollectionInterval is the interval at which the Nomad server + // publishes metrics which are periodic in nature like updating gauges + StatsCollectionInterval time.Duration } // CheckVersion is used to check if the ProtocolVersion is valid diff --git a/nomad/leader.go b/nomad/leader.go index 8a4d2dbab..97b52c2de 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -180,6 +180,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Periodically unblock failed allocations go s.periodicUnblockFailedEvals(stopCh) + // Periodically publish job summary metrics + go s.publishJobSummaryMetrics(stopCh) + // Setup the heartbeat timers. This is done both when starting up or when // a leader fail over happens. Since the timers are maintained by the leader // node, effectively this means all the timers are renewed at the time of failover. @@ -519,6 +522,52 @@ func (s *Server) periodicUnblockFailedEvals(stopCh chan struct{}) { } } +// publishJobSummaryMetrics publishes the job summaries as metrics +func (s *Server) publishJobSummaryMetrics(stopCh chan struct{}) { + // Using a timer instead of a ticker so that we can publish after the + // current batch of metrics have been published + timer := time.NewTimer(0) + defer timer.Stop() + + for { + select { + case <-stopCh: + return + case <-timer.C: + state, err := s.State().Snapshot() + if err != nil { + timer.Reset(s.config.StatsCollectionInterval) + s.logger.Printf("[ERR] nomad: failed to get state: %v", err) + continue + } + ws := memdb.NewWatchSet() + iter, err := state.JobSummaries(ws) + if err != nil { + timer.Reset(s.config.StatsCollectionInterval) + s.logger.Printf("[ERR] nomad: failed to get job summaries: %v", err) + continue + } + + for { + raw := iter.Next() + if raw == nil { + break + } + summary := raw.(*structs.JobSummary) + for name, tgSummary := range summary.Summary { + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "queued"}, float32(tgSummary.Queued)) + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "complete"}, float32(tgSummary.Complete)) + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "failed"}, float32(tgSummary.Failed)) + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "running"}, float32(tgSummary.Running)) + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "starting"}, float32(tgSummary.Starting)) + metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "lost"}, float32(tgSummary.Lost)) + } + } + timer.Reset(s.config.StatsCollectionInterval) + } + } +} + // revokeLeadership is invoked once we step down as leader. // This is used to cleanup any state that may be specific to a leader. func (s *Server) revokeLeadership() error {