diff --git a/nomad/drainer/drain.go b/nomad/drainer/drain.go index 5175f609f..5f35bca6c 100644 --- a/nomad/drainer/drain.go +++ b/nomad/drainer/drain.go @@ -4,10 +4,8 @@ import ( "context" "log" "strings" - "sync" "time" - memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" @@ -460,244 +458,6 @@ func (n *NodeDrainer) nodeDrainer(ctx context.Context, state *state.StateStore) } } -// nodeWatcher watches for nodes to start or stop draining -type nodeWatcher struct { - index uint64 - nodes map[string]*structs.Node - nodesCh chan map[string]*structs.Node - state *state.StateStore - logger *log.Logger -} - -func newNodeWatcher(logger *log.Logger, nodes map[string]*structs.Node, index uint64, state *state.StateStore) *nodeWatcher { - return &nodeWatcher{ - nodes: nodes, - nodesCh: make(chan map[string]*structs.Node), - index: index, - state: state, - logger: logger, - } -} - -func (n *nodeWatcher) run(ctx context.Context) { - // Trigger an initial drain pass if there are already nodes draining - //FIXME this is unneccessary if a node has reached a deadline - n.logger.Printf("[TRACE] nomad.drain: initial draining nodes: %d", len(n.nodes)) - if len(n.nodes) > 0 { - n.nodesCh <- n.nodes - } - - for { - //FIXME it seems possible for this to return a nil error and a 0 index, what to do in that case? - resp, index, err := n.state.BlockingQuery(n.queryNodeDrain, n.index, ctx) - if err != nil { - if err == context.Canceled { - n.logger.Printf("[TRACE] nomad.drain: draining node watcher shutting down") - return - } - n.logger.Printf("[ERR] nomad.drain: error blocking on node updates at index %d: %v", n.index, err) - return - } - - // update index for next run - n.index = index - - changed := false - newNodes := resp.([]*structs.Node) - n.logger.Printf("[TRACE] nomad.drain: %d nodes to consider", len(newNodes)) //FIXME remove - for _, newNode := range newNodes { - if existingNode, ok := n.nodes[newNode.ID]; ok { - // Node was draining, see if it has changed - if !newNode.Drain { - // Node stopped draining - delete(n.nodes, newNode.ID) - changed = true - } else if !newNode.DrainStrategy.DeadlineTime().Equal(existingNode.DrainStrategy.DeadlineTime()) { - // Update deadline - n.nodes[newNode.ID] = newNode - changed = true - } - } else { - // Node was not draining - if newNode.Drain { - // Node started draining - n.nodes[newNode.ID] = newNode - changed = true - } - } - } - - // Send a copy of the draining nodes if there were changes - if !changed { - continue - } - - nodesCopy := make(map[string]*structs.Node, len(n.nodes)) - for k, v := range n.nodes { - nodesCopy[k] = v - } - - select { - case n.nodesCh <- nodesCopy: - case <-ctx.Done(): - return - } - } -} - -func (n *nodeWatcher) queryNodeDrain(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) { - iter, err := state.Nodes(ws) - if err != nil { - return nil, 0, err - } - - index, err := state.Index("nodes") - if err != nil { - return nil, 0, err - } - - resp := make([]*structs.Node, 0, 8) - - for { - raw := iter.Next() - if raw == nil { - break - } - - node := raw.(*structs.Node) - resp = append(resp, node) - } - - return resp, index, nil -} - -type jobWatcher struct { - // allocsIndex to start watching from - allocsIndex uint64 - - // job -> node.ID - jobs map[jobKey]string - jobsMu sync.Mutex - - jobsCh chan map[jobKey]struct{} - - state *state.StateStore - - logger *log.Logger -} - -func newJobWatcher(logger *log.Logger, jobs map[jobKey]string, allocsIndex uint64, state *state.StateStore) *jobWatcher { - return &jobWatcher{ - allocsIndex: allocsIndex, - logger: logger, - jobs: jobs, - jobsCh: make(chan map[jobKey]struct{}), - state: state, - } -} - -func (j *jobWatcher) watch(k jobKey, nodeID string) { - j.logger.Printf("[TRACE] nomad.drain: watching job %s on draining node %s", k.jobid, nodeID[:6]) - j.jobsMu.Lock() - j.jobs[k] = nodeID - j.jobsMu.Unlock() -} - -func (j *jobWatcher) nodeDone(nodeID string) { - j.jobsMu.Lock() - defer j.jobsMu.Unlock() - for k, v := range j.jobs { - if v == nodeID { - j.logger.Printf("[TRACE] nomad.drain: UNwatching job %s on done draining node %s", k.jobid, nodeID[:6]) - delete(j.jobs, k) - } - } -} - -func (j *jobWatcher) WaitCh() <-chan map[jobKey]struct{} { - return j.jobsCh -} - -func (j *jobWatcher) run(ctx context.Context) { - var resp interface{} - var err error - - for { - //FIXME have watchAllocs create a closure and give it a copy of j.jobs to remove locking? - //FIXME it seems possible for this to return a nil error and a 0 index, what to do in that case? - var newIndex uint64 - resp, newIndex, err = j.state.BlockingQuery(j.watchAllocs, j.allocsIndex, ctx) - if err != nil { - if err == context.Canceled { - j.logger.Printf("[TRACE] nomad.drain: job watcher shutting down") - return - } - j.logger.Printf("[ERR] nomad.drain: error blocking on alloc updates: %v", err) - return - } - - j.logger.Printf("[TRACE] nomad.drain: job watcher old index: %d new index: %d", j.allocsIndex, newIndex) - j.allocsIndex = newIndex - - changedJobs := resp.(map[jobKey]struct{}) - if len(changedJobs) > 0 { - select { - case j.jobsCh <- changedJobs: - case <-ctx.Done(): - return - } - } - } -} - -func (j *jobWatcher) watchAllocs(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) { - iter, err := state.Allocs(ws) - if err != nil { - return nil, 0, err - } - - index, err := state.Index("allocs") - if err != nil { - return nil, 0, err - } - - skipped := 0 - - // job ids - resp := map[jobKey]struct{}{} - - for { - raw := iter.Next() - if raw == nil { - break - } - - alloc := raw.(*structs.Allocation) - - j.jobsMu.Lock() - _, ok := j.jobs[jobKey{alloc.Namespace, alloc.JobID}] - j.jobsMu.Unlock() - - if !ok { - // alloc is not part of a draining job - skipped++ - continue - } - - // don't wake drain loop if alloc hasn't updated its health - if alloc.DeploymentStatus.IsHealthy() || alloc.DeploymentStatus.IsUnhealthy() { - j.logger.Printf("[TRACE] nomad.drain: job watcher found alloc %s - deployment status: %t", alloc.ID[:6], *alloc.DeploymentStatus.Healthy) - resp[jobKey{alloc.Namespace, alloc.JobID}] = struct{}{} - } else { - j.logger.Printf("[TRACE] nomad.drain: job watcher ignoring alloc %s - no deployment status", alloc.ID[:6]) - } - } - - j.logger.Printf("[TRACE] nomad.drain: job watcher ignoring %d allocs - not part of draining job at index %d", skipped, index) - - return resp, index, nil -} - // initDrainer initializes the node drainer state and returns a list of // draining nodes as well as allocs that are draining that should be watched // for a replacement. diff --git a/nomad/drainer/jobwatcher.go b/nomad/drainer/jobwatcher.go new file mode 100644 index 000000000..95a1be5d1 --- /dev/null +++ b/nomad/drainer/jobwatcher.go @@ -0,0 +1,140 @@ +package drainer + +import ( + "context" + "log" + "sync" + + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" +) + +// jobWatcher watches allocation changes for jobs with at least one allocation +// on a draining node. +type jobWatcher struct { + // allocsIndex to start watching from + allocsIndex uint64 + + // job -> node.ID + jobs map[jobKey]string + jobsMu sync.Mutex + + jobsCh chan map[jobKey]struct{} + + state *state.StateStore + + logger *log.Logger +} + +func newJobWatcher(logger *log.Logger, jobs map[jobKey]string, allocsIndex uint64, state *state.StateStore) *jobWatcher { + return &jobWatcher{ + allocsIndex: allocsIndex, + logger: logger, + jobs: jobs, + jobsCh: make(chan map[jobKey]struct{}), + state: state, + } +} + +func (j *jobWatcher) watch(k jobKey, nodeID string) { + j.logger.Printf("[TRACE] nomad.drain: watching job %s on draining node %s", k.jobid, nodeID[:6]) + j.jobsMu.Lock() + j.jobs[k] = nodeID + j.jobsMu.Unlock() +} + +func (j *jobWatcher) nodeDone(nodeID string) { + j.jobsMu.Lock() + defer j.jobsMu.Unlock() + for k, v := range j.jobs { + if v == nodeID { + j.logger.Printf("[TRACE] nomad.drain: UNwatching job %s on done draining node %s", k.jobid, nodeID[:6]) + delete(j.jobs, k) + } + } +} + +func (j *jobWatcher) WaitCh() <-chan map[jobKey]struct{} { + return j.jobsCh +} + +func (j *jobWatcher) run(ctx context.Context) { + var resp interface{} + var err error + + for { + //FIXME have watchAllocs create a closure and give it a copy of j.jobs to remove locking? + //FIXME it seems possible for this to return a nil error and a 0 index, what to do in that case? + var newIndex uint64 + resp, newIndex, err = j.state.BlockingQuery(j.watchAllocs, j.allocsIndex, ctx) + if err != nil { + if err == context.Canceled { + j.logger.Printf("[TRACE] nomad.drain: job watcher shutting down") + return + } + j.logger.Printf("[ERR] nomad.drain: error blocking on alloc updates: %v", err) + return + } + + j.logger.Printf("[TRACE] nomad.drain: job watcher old index: %d new index: %d", j.allocsIndex, newIndex) + j.allocsIndex = newIndex + + changedJobs := resp.(map[jobKey]struct{}) + if len(changedJobs) > 0 { + select { + case j.jobsCh <- changedJobs: + case <-ctx.Done(): + return + } + } + } +} + +func (j *jobWatcher) watchAllocs(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) { + iter, err := state.Allocs(ws) + if err != nil { + return nil, 0, err + } + + index, err := state.Index("allocs") + if err != nil { + return nil, 0, err + } + + skipped := 0 + + // job ids + resp := map[jobKey]struct{}{} + + for { + raw := iter.Next() + if raw == nil { + break + } + + alloc := raw.(*structs.Allocation) + + j.jobsMu.Lock() + _, ok := j.jobs[jobKey{alloc.Namespace, alloc.JobID}] + j.jobsMu.Unlock() + + if !ok { + // alloc is not part of a draining job + skipped++ + continue + } + + // don't wake drain loop if alloc hasn't updated its health + if alloc.DeploymentStatus.IsHealthy() || alloc.DeploymentStatus.IsUnhealthy() { + j.logger.Printf("[TRACE] nomad.drain: job watcher found alloc %s - deployment status: %t", alloc.ID[:6], *alloc.DeploymentStatus.Healthy) + resp[jobKey{alloc.Namespace, alloc.JobID}] = struct{}{} + } else { + j.logger.Printf("[TRACE] nomad.drain: job watcher ignoring alloc %s - no deployment status", alloc.ID[:6]) + } + } + + j.logger.Printf("[TRACE] nomad.drain: job watcher ignoring %d allocs - not part of draining job at index %d", skipped, index) + + return resp, index, nil +} diff --git a/nomad/drainer/nodewatcher.go b/nomad/drainer/nodewatcher.go new file mode 100644 index 000000000..eb54e4995 --- /dev/null +++ b/nomad/drainer/nodewatcher.go @@ -0,0 +1,121 @@ +package drainer + +import ( + "context" + "log" + + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" +) + +// nodeWatcher watches for nodes to start or stop draining +type nodeWatcher struct { + index uint64 + nodes map[string]*structs.Node + nodesCh chan map[string]*structs.Node + state *state.StateStore + logger *log.Logger +} + +func newNodeWatcher(logger *log.Logger, nodes map[string]*structs.Node, index uint64, state *state.StateStore) *nodeWatcher { + return &nodeWatcher{ + nodes: nodes, + nodesCh: make(chan map[string]*structs.Node), + index: index, + state: state, + logger: logger, + } +} + +func (n *nodeWatcher) run(ctx context.Context) { + // Trigger an initial drain pass if there are already nodes draining + //FIXME this is unneccessary if a node has reached a deadline + n.logger.Printf("[TRACE] nomad.drain: initial draining nodes: %d", len(n.nodes)) + if len(n.nodes) > 0 { + n.nodesCh <- n.nodes + } + + for { + //FIXME it seems possible for this to return a nil error and a 0 index, what to do in that case? + resp, index, err := n.state.BlockingQuery(n.queryNodeDrain, n.index, ctx) + if err != nil { + if err == context.Canceled { + n.logger.Printf("[TRACE] nomad.drain: draining node watcher shutting down") + return + } + n.logger.Printf("[ERR] nomad.drain: error blocking on node updates at index %d: %v", n.index, err) + return + } + + // update index for next run + n.index = index + + changed := false + newNodes := resp.([]*structs.Node) + n.logger.Printf("[TRACE] nomad.drain: %d nodes to consider", len(newNodes)) //FIXME remove + for _, newNode := range newNodes { + if existingNode, ok := n.nodes[newNode.ID]; ok { + // Node was draining, see if it has changed + if !newNode.Drain { + // Node stopped draining + delete(n.nodes, newNode.ID) + changed = true + } else if !newNode.DrainStrategy.DeadlineTime().Equal(existingNode.DrainStrategy.DeadlineTime()) { + // Update deadline + n.nodes[newNode.ID] = newNode + changed = true + } + } else { + // Node was not draining + if newNode.Drain { + // Node started draining + n.nodes[newNode.ID] = newNode + changed = true + } + } + } + + // Send a copy of the draining nodes if there were changes + if !changed { + continue + } + + nodesCopy := make(map[string]*structs.Node, len(n.nodes)) + for k, v := range n.nodes { + nodesCopy[k] = v + } + + select { + case n.nodesCh <- nodesCopy: + case <-ctx.Done(): + return + } + } +} + +func (n *nodeWatcher) queryNodeDrain(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) { + iter, err := state.Nodes(ws) + if err != nil { + return nil, 0, err + } + + index, err := state.Index("nodes") + if err != nil { + return nil, 0, err + } + + resp := make([]*structs.Node, 0, 8) + + for { + raw := iter.Next() + if raw == nil { + break + } + + node := raw.(*structs.Node) + resp = append(resp, node) + } + + return resp, index, nil +}