drainer: factor job & node watchers out of drainer.go

This commit is contained in:
Michael Schurter 2018-02-27 14:50:17 -08:00
parent 5922aef623
commit ab0de41884
3 changed files with 261 additions and 240 deletions

View File

@ -4,10 +4,8 @@ import (
"context"
"log"
"strings"
"sync"
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
@ -460,244 +458,6 @@ func (n *NodeDrainer) nodeDrainer(ctx context.Context, state *state.StateStore)
}
}
// nodeWatcher watches for nodes to start or stop draining
type nodeWatcher struct {
index uint64
nodes map[string]*structs.Node
nodesCh chan map[string]*structs.Node
state *state.StateStore
logger *log.Logger
}
func newNodeWatcher(logger *log.Logger, nodes map[string]*structs.Node, index uint64, state *state.StateStore) *nodeWatcher {
return &nodeWatcher{
nodes: nodes,
nodesCh: make(chan map[string]*structs.Node),
index: index,
state: state,
logger: logger,
}
}
func (n *nodeWatcher) run(ctx context.Context) {
// Trigger an initial drain pass if there are already nodes draining
//FIXME this is unneccessary if a node has reached a deadline
n.logger.Printf("[TRACE] nomad.drain: initial draining nodes: %d", len(n.nodes))
if len(n.nodes) > 0 {
n.nodesCh <- n.nodes
}
for {
//FIXME it seems possible for this to return a nil error and a 0 index, what to do in that case?
resp, index, err := n.state.BlockingQuery(n.queryNodeDrain, n.index, ctx)
if err != nil {
if err == context.Canceled {
n.logger.Printf("[TRACE] nomad.drain: draining node watcher shutting down")
return
}
n.logger.Printf("[ERR] nomad.drain: error blocking on node updates at index %d: %v", n.index, err)
return
}
// update index for next run
n.index = index
changed := false
newNodes := resp.([]*structs.Node)
n.logger.Printf("[TRACE] nomad.drain: %d nodes to consider", len(newNodes)) //FIXME remove
for _, newNode := range newNodes {
if existingNode, ok := n.nodes[newNode.ID]; ok {
// Node was draining, see if it has changed
if !newNode.Drain {
// Node stopped draining
delete(n.nodes, newNode.ID)
changed = true
} else if !newNode.DrainStrategy.DeadlineTime().Equal(existingNode.DrainStrategy.DeadlineTime()) {
// Update deadline
n.nodes[newNode.ID] = newNode
changed = true
}
} else {
// Node was not draining
if newNode.Drain {
// Node started draining
n.nodes[newNode.ID] = newNode
changed = true
}
}
}
// Send a copy of the draining nodes if there were changes
if !changed {
continue
}
nodesCopy := make(map[string]*structs.Node, len(n.nodes))
for k, v := range n.nodes {
nodesCopy[k] = v
}
select {
case n.nodesCh <- nodesCopy:
case <-ctx.Done():
return
}
}
}
func (n *nodeWatcher) queryNodeDrain(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
iter, err := state.Nodes(ws)
if err != nil {
return nil, 0, err
}
index, err := state.Index("nodes")
if err != nil {
return nil, 0, err
}
resp := make([]*structs.Node, 0, 8)
for {
raw := iter.Next()
if raw == nil {
break
}
node := raw.(*structs.Node)
resp = append(resp, node)
}
return resp, index, nil
}
type jobWatcher struct {
// allocsIndex to start watching from
allocsIndex uint64
// job -> node.ID
jobs map[jobKey]string
jobsMu sync.Mutex
jobsCh chan map[jobKey]struct{}
state *state.StateStore
logger *log.Logger
}
func newJobWatcher(logger *log.Logger, jobs map[jobKey]string, allocsIndex uint64, state *state.StateStore) *jobWatcher {
return &jobWatcher{
allocsIndex: allocsIndex,
logger: logger,
jobs: jobs,
jobsCh: make(chan map[jobKey]struct{}),
state: state,
}
}
func (j *jobWatcher) watch(k jobKey, nodeID string) {
j.logger.Printf("[TRACE] nomad.drain: watching job %s on draining node %s", k.jobid, nodeID[:6])
j.jobsMu.Lock()
j.jobs[k] = nodeID
j.jobsMu.Unlock()
}
func (j *jobWatcher) nodeDone(nodeID string) {
j.jobsMu.Lock()
defer j.jobsMu.Unlock()
for k, v := range j.jobs {
if v == nodeID {
j.logger.Printf("[TRACE] nomad.drain: UNwatching job %s on done draining node %s", k.jobid, nodeID[:6])
delete(j.jobs, k)
}
}
}
func (j *jobWatcher) WaitCh() <-chan map[jobKey]struct{} {
return j.jobsCh
}
func (j *jobWatcher) run(ctx context.Context) {
var resp interface{}
var err error
for {
//FIXME have watchAllocs create a closure and give it a copy of j.jobs to remove locking?
//FIXME it seems possible for this to return a nil error and a 0 index, what to do in that case?
var newIndex uint64
resp, newIndex, err = j.state.BlockingQuery(j.watchAllocs, j.allocsIndex, ctx)
if err != nil {
if err == context.Canceled {
j.logger.Printf("[TRACE] nomad.drain: job watcher shutting down")
return
}
j.logger.Printf("[ERR] nomad.drain: error blocking on alloc updates: %v", err)
return
}
j.logger.Printf("[TRACE] nomad.drain: job watcher old index: %d new index: %d", j.allocsIndex, newIndex)
j.allocsIndex = newIndex
changedJobs := resp.(map[jobKey]struct{})
if len(changedJobs) > 0 {
select {
case j.jobsCh <- changedJobs:
case <-ctx.Done():
return
}
}
}
}
func (j *jobWatcher) watchAllocs(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
iter, err := state.Allocs(ws)
if err != nil {
return nil, 0, err
}
index, err := state.Index("allocs")
if err != nil {
return nil, 0, err
}
skipped := 0
// job ids
resp := map[jobKey]struct{}{}
for {
raw := iter.Next()
if raw == nil {
break
}
alloc := raw.(*structs.Allocation)
j.jobsMu.Lock()
_, ok := j.jobs[jobKey{alloc.Namespace, alloc.JobID}]
j.jobsMu.Unlock()
if !ok {
// alloc is not part of a draining job
skipped++
continue
}
// don't wake drain loop if alloc hasn't updated its health
if alloc.DeploymentStatus.IsHealthy() || alloc.DeploymentStatus.IsUnhealthy() {
j.logger.Printf("[TRACE] nomad.drain: job watcher found alloc %s - deployment status: %t", alloc.ID[:6], *alloc.DeploymentStatus.Healthy)
resp[jobKey{alloc.Namespace, alloc.JobID}] = struct{}{}
} else {
j.logger.Printf("[TRACE] nomad.drain: job watcher ignoring alloc %s - no deployment status", alloc.ID[:6])
}
}
j.logger.Printf("[TRACE] nomad.drain: job watcher ignoring %d allocs - not part of draining job at index %d", skipped, index)
return resp, index, nil
}
// initDrainer initializes the node drainer state and returns a list of
// draining nodes as well as allocs that are draining that should be watched
// for a replacement.

140
nomad/drainer/jobwatcher.go Normal file
View File

@ -0,0 +1,140 @@
package drainer
import (
"context"
"log"
"sync"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
)
// jobWatcher watches allocation changes for jobs with at least one allocation
// on a draining node.
type jobWatcher struct {
// allocsIndex to start watching from
allocsIndex uint64
// job -> node.ID
jobs map[jobKey]string
jobsMu sync.Mutex
jobsCh chan map[jobKey]struct{}
state *state.StateStore
logger *log.Logger
}
func newJobWatcher(logger *log.Logger, jobs map[jobKey]string, allocsIndex uint64, state *state.StateStore) *jobWatcher {
return &jobWatcher{
allocsIndex: allocsIndex,
logger: logger,
jobs: jobs,
jobsCh: make(chan map[jobKey]struct{}),
state: state,
}
}
func (j *jobWatcher) watch(k jobKey, nodeID string) {
j.logger.Printf("[TRACE] nomad.drain: watching job %s on draining node %s", k.jobid, nodeID[:6])
j.jobsMu.Lock()
j.jobs[k] = nodeID
j.jobsMu.Unlock()
}
func (j *jobWatcher) nodeDone(nodeID string) {
j.jobsMu.Lock()
defer j.jobsMu.Unlock()
for k, v := range j.jobs {
if v == nodeID {
j.logger.Printf("[TRACE] nomad.drain: UNwatching job %s on done draining node %s", k.jobid, nodeID[:6])
delete(j.jobs, k)
}
}
}
func (j *jobWatcher) WaitCh() <-chan map[jobKey]struct{} {
return j.jobsCh
}
func (j *jobWatcher) run(ctx context.Context) {
var resp interface{}
var err error
for {
//FIXME have watchAllocs create a closure and give it a copy of j.jobs to remove locking?
//FIXME it seems possible for this to return a nil error and a 0 index, what to do in that case?
var newIndex uint64
resp, newIndex, err = j.state.BlockingQuery(j.watchAllocs, j.allocsIndex, ctx)
if err != nil {
if err == context.Canceled {
j.logger.Printf("[TRACE] nomad.drain: job watcher shutting down")
return
}
j.logger.Printf("[ERR] nomad.drain: error blocking on alloc updates: %v", err)
return
}
j.logger.Printf("[TRACE] nomad.drain: job watcher old index: %d new index: %d", j.allocsIndex, newIndex)
j.allocsIndex = newIndex
changedJobs := resp.(map[jobKey]struct{})
if len(changedJobs) > 0 {
select {
case j.jobsCh <- changedJobs:
case <-ctx.Done():
return
}
}
}
}
func (j *jobWatcher) watchAllocs(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
iter, err := state.Allocs(ws)
if err != nil {
return nil, 0, err
}
index, err := state.Index("allocs")
if err != nil {
return nil, 0, err
}
skipped := 0
// job ids
resp := map[jobKey]struct{}{}
for {
raw := iter.Next()
if raw == nil {
break
}
alloc := raw.(*structs.Allocation)
j.jobsMu.Lock()
_, ok := j.jobs[jobKey{alloc.Namespace, alloc.JobID}]
j.jobsMu.Unlock()
if !ok {
// alloc is not part of a draining job
skipped++
continue
}
// don't wake drain loop if alloc hasn't updated its health
if alloc.DeploymentStatus.IsHealthy() || alloc.DeploymentStatus.IsUnhealthy() {
j.logger.Printf("[TRACE] nomad.drain: job watcher found alloc %s - deployment status: %t", alloc.ID[:6], *alloc.DeploymentStatus.Healthy)
resp[jobKey{alloc.Namespace, alloc.JobID}] = struct{}{}
} else {
j.logger.Printf("[TRACE] nomad.drain: job watcher ignoring alloc %s - no deployment status", alloc.ID[:6])
}
}
j.logger.Printf("[TRACE] nomad.drain: job watcher ignoring %d allocs - not part of draining job at index %d", skipped, index)
return resp, index, nil
}

View File

@ -0,0 +1,121 @@
package drainer
import (
"context"
"log"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
)
// nodeWatcher watches for nodes to start or stop draining
type nodeWatcher struct {
index uint64
nodes map[string]*structs.Node
nodesCh chan map[string]*structs.Node
state *state.StateStore
logger *log.Logger
}
func newNodeWatcher(logger *log.Logger, nodes map[string]*structs.Node, index uint64, state *state.StateStore) *nodeWatcher {
return &nodeWatcher{
nodes: nodes,
nodesCh: make(chan map[string]*structs.Node),
index: index,
state: state,
logger: logger,
}
}
func (n *nodeWatcher) run(ctx context.Context) {
// Trigger an initial drain pass if there are already nodes draining
//FIXME this is unneccessary if a node has reached a deadline
n.logger.Printf("[TRACE] nomad.drain: initial draining nodes: %d", len(n.nodes))
if len(n.nodes) > 0 {
n.nodesCh <- n.nodes
}
for {
//FIXME it seems possible for this to return a nil error and a 0 index, what to do in that case?
resp, index, err := n.state.BlockingQuery(n.queryNodeDrain, n.index, ctx)
if err != nil {
if err == context.Canceled {
n.logger.Printf("[TRACE] nomad.drain: draining node watcher shutting down")
return
}
n.logger.Printf("[ERR] nomad.drain: error blocking on node updates at index %d: %v", n.index, err)
return
}
// update index for next run
n.index = index
changed := false
newNodes := resp.([]*structs.Node)
n.logger.Printf("[TRACE] nomad.drain: %d nodes to consider", len(newNodes)) //FIXME remove
for _, newNode := range newNodes {
if existingNode, ok := n.nodes[newNode.ID]; ok {
// Node was draining, see if it has changed
if !newNode.Drain {
// Node stopped draining
delete(n.nodes, newNode.ID)
changed = true
} else if !newNode.DrainStrategy.DeadlineTime().Equal(existingNode.DrainStrategy.DeadlineTime()) {
// Update deadline
n.nodes[newNode.ID] = newNode
changed = true
}
} else {
// Node was not draining
if newNode.Drain {
// Node started draining
n.nodes[newNode.ID] = newNode
changed = true
}
}
}
// Send a copy of the draining nodes if there were changes
if !changed {
continue
}
nodesCopy := make(map[string]*structs.Node, len(n.nodes))
for k, v := range n.nodes {
nodesCopy[k] = v
}
select {
case n.nodesCh <- nodesCopy:
case <-ctx.Done():
return
}
}
}
func (n *nodeWatcher) queryNodeDrain(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
iter, err := state.Nodes(ws)
if err != nil {
return nil, 0, err
}
index, err := state.Index("nodes")
if err != nil {
return nil, 0, err
}
resp := make([]*structs.Node, 0, 8)
for {
raw := iter.Next()
if raw == nil {
break
}
node := raw.(*structs.Node)
resp = append(resp, node)
}
return resp, index, nil
}