Add metrics to show allocations on the client

This PR adds the following metrics to the client:
client.allocations.migrating
client.allocations.blocked
client.allocations.pending
client.allocations.running
client.allocations.terminal

Also adds some missing fields to the API version of the evaluation.
This commit is contained in:
Alex Dadgar 2017-03-09 12:37:41 -08:00
parent ee0be46c49
commit 9011a7984c
4 changed files with 68 additions and 28 deletions

View File

@ -54,24 +54,28 @@ func (e *Evaluations) Allocations(evalID string, q *QueryOptions) ([]*Allocation
// Evaluation is used to serialize an evaluation.
type Evaluation struct {
ID string
Priority int
Type string
TriggeredBy string
JobID string
JobModifyIndex uint64
NodeID string
NodeModifyIndex uint64
Status string
StatusDescription string
Wait time.Duration
NextEval string
PreviousEval string
BlockedEval string
FailedTGAllocs map[string]*AllocationMetric
QueuedAllocations map[string]int
CreateIndex uint64
ModifyIndex uint64
ID string
Priority int
Type string
TriggeredBy string
JobID string
JobModifyIndex uint64
NodeID string
NodeModifyIndex uint64
Status string
StatusDescription string
Wait time.Duration
NextEval string
PreviousEval string
BlockedEval string
FailedTGAllocs map[string]*AllocationMetric
ClassEligibility map[string]bool
EscapedComputedClass bool
AnnotatePlan bool
QueuedAllocations map[string]int
SnapshotIndex uint64
CreateIndex uint64
ModifyIndex uint64
}
// EvalIndexSort is a wrapper to sort evaluations by CreateIndex.

View File

@ -584,6 +584,8 @@ func (r *AllocRunner) handleDestroy() {
r.logger.Printf("[ERR] client: failed to destroy state for alloc '%s': %v",
r.alloc.ID, err)
}
case <-r.updateCh:
r.logger.Printf("[ERR] client: dropping update to terminal alloc '%s'", r.alloc.ID)
}
}

View File

@ -310,7 +310,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
go c.run()
// Start collecting stats
go c.collectHostStats()
go c.emitStats()
c.logger.Printf("[INFO] client: Node ID %q", c.Node().ID)
return c, nil
@ -2170,8 +2170,8 @@ func (c *Client) consulReaperImpl() error {
return c.consulSyncer.ReapUnmatched(domains)
}
// collectHostStats collects host resource usage stats periodically
func (c *Client) collectHostStats() {
// emitStats collects host resource usage stats periodically
func (c *Client) emitStats() {
// Start collecting host stats right away and then keep collecting every
// collection interval
next := time.NewTimer(0)
@ -2188,16 +2188,18 @@ func (c *Client) collectHostStats() {
// Publish Node metrics if operator has opted in
if c.config.PublishNodeMetrics {
c.emitStats(c.hostStatsCollector.Stats())
c.emitHostStats(c.hostStatsCollector.Stats())
}
c.emitClientMetrics()
case <-c.shutdownCh:
return
}
}
}
// emitStats pushes host resource usage stats to remote metrics collection sinks
func (c *Client) emitStats(hStats *stats.HostStats) {
// emitHostStats pushes host resource usage stats to remote metrics collection sinks
func (c *Client) emitHostStats(hStats *stats.HostStats) {
nodeID := c.Node().ID
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "total"}, float32(hStats.Memory.Total))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "available"}, float32(hStats.Memory.Available))
@ -2263,6 +2265,38 @@ func (c *Client) emitStats(hStats *stats.HostStats) {
}
}
// emitClientMetrics emits lower volume client metrics
func (c *Client) emitClientMetrics() {
nodeID := c.Node().ID
// Emit allocation metrics
c.migratingAllocsLock.Lock()
migrating := len(c.migratingAllocs)
c.migratingAllocsLock.Unlock()
c.blockedAllocsLock.Lock()
blocked := len(c.blockedAllocations)
c.blockedAllocsLock.Unlock()
pending, running, terminal := 0, 0, 0
for _, ar := range c.getAllocRunners() {
switch ar.Alloc().ClientStatus {
case structs.AllocClientStatusPending:
pending++
case structs.AllocClientStatusRunning:
running++
case structs.AllocClientStatusComplete, structs.AllocClientStatusFailed:
terminal++
}
}
metrics.SetGauge([]string{"client", "allocations", "migrating", nodeID}, float32(migrating))
metrics.SetGauge([]string{"client", "allocations", "blocked", nodeID}, float32(blocked))
metrics.SetGauge([]string{"client", "allocations", "pending", nodeID}, float32(pending))
metrics.SetGauge([]string{"client", "allocations", "running", nodeID}, float32(running))
metrics.SetGauge([]string{"client", "allocations", "terminal", nodeID}, float32(terminal))
}
func (c *Client) getAllocatedResources(selfNode *structs.Node) *structs.Resources {
// Unfortunately the allocs only have IP so we need to match them to the
// device

View File

@ -3827,15 +3827,15 @@ type Evaluation struct {
// during the evaluation. This should not be set during normal operations.
AnnotatePlan bool
// QueuedAllocations is the number of unplaced allocations at the time the
// evaluation was processed. The map is keyed by Task Group names.
QueuedAllocations map[string]int
// SnapshotIndex is the Raft index of the snapshot used to process the
// evaluation. As such it will only be set once it has gone through the
// scheduler.
SnapshotIndex uint64
// QueuedAllocations is the number of unplaced allocations at the time the
// evaluation was processed. The map is keyed by Task Group names.
QueuedAllocations map[string]int
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64