open-nomad/client/drain.go
Tim Gross 62548616d4
client: allow drain_on_shutdown configuration (#16827)
Adds a new configuration to clients to optionally allow them to drain their
workloads on shutdown. The client sends the `Node.UpdateDrain` RPC targeting
itself and then monitors the drain state as seen by the server until the drain
is complete or the deadline expires. If it loses connection with the server, it
will monitor local client status instead to ensure allocations are stopped
before exiting.
2023-04-14 15:35:32 -04:00

164 lines
4.5 KiB
Go

package client
import (
"context"
"fmt"
"time"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)
func (c *Client) DrainSelf() error {
drainSpec := c.GetConfig().Drain
if drainSpec == nil {
return nil
}
logger := c.logger.Named("drain")
now := time.Now()
drainReq := &structs.NodeUpdateDrainRequest{
NodeID: c.NodeID(),
DrainStrategy: &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: drainSpec.Deadline,
IgnoreSystemJobs: drainSpec.IgnoreSystemJobs,
},
StartedAt: now,
},
MarkEligible: false,
Meta: map[string]string{"message": "shutting down"},
WriteRequest: structs.WriteRequest{
Region: c.Region(), AuthToken: c.secretNodeID()},
}
if drainSpec.Deadline > 0 {
drainReq.DrainStrategy.ForceDeadline = now.Add(drainSpec.Deadline)
}
var drainResp structs.NodeDrainUpdateResponse
err := c.RPC("Node.UpdateDrain", drainReq, &drainResp)
if err != nil {
return err
}
// note: the default deadline is 1hr but could be set to "". letting this
// run forever seems wrong but init system (ex systemd) will almost always
// force kill the client eventually
ctx := context.Background()
var cancel context.CancelFunc
if drainSpec.Deadline > 0 {
// if we set this context to the deadline, the server will reach the
// deadline but not get a chance to record it before this context
// expires, resulting in spurious errors. So extend the deadline here by
// a few seconds
ctx, cancel = context.WithTimeout(context.Background(), drainSpec.Deadline+(5*time.Second))
defer cancel()
}
statusCheckInterval := time.Second
logger.Info("monitoring self-drain")
err = c.pollServerForDrainStatus(ctx, statusCheckInterval)
switch err {
case nil:
logger.Debug("self-drain complete")
return nil
case context.DeadlineExceeded, context.Canceled:
logger.Error("self-drain exceeded deadline")
return fmt.Errorf("self-drain exceeded deadline")
default:
logger.Error("could not check node status, falling back to local status checks", "error", err)
}
err = c.pollLocalStatusForDrainStatus(ctx, statusCheckInterval, drainSpec)
if err != nil {
return fmt.Errorf("self-drain exceeded deadline")
}
logger.Debug("self-drain complete")
return nil
}
// pollServerForDrainStatus will poll the server periodically for the client's
// drain status, returning an error if the context expires or get any error from
// the RPC call. If this function returns nil, the drain was successful.
func (c *Client) pollServerForDrainStatus(ctx context.Context, interval time.Duration) error {
timer, stop := helper.NewSafeTimer(0)
defer stop()
statusReq := &structs.NodeSpecificRequest{
NodeID: c.NodeID(),
SecretID: c.secretNodeID(),
QueryOptions: structs.QueryOptions{
Region: c.Region(), AuthToken: c.secretNodeID()},
}
var statusResp structs.SingleNodeResponse
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
err := c.RPC("Node.GetNode", statusReq, &statusResp)
if err != nil {
return err
}
if &statusResp != nil && statusResp.Node.DrainStrategy == nil {
return nil
}
timer.Reset(interval)
}
}
}
// pollLocalStatusForDrainStatus polls the local allocrunner state periodicially
// for the client status of all allocation runners, returning an error if the
// context expires or get any error from the RPC call. If this function returns
// nil, the drain was successful. This is a fallback function in case polling
// the server fails.
func (c *Client) pollLocalStatusForDrainStatus(ctx context.Context,
interval time.Duration, drainSpec *config.DrainConfig) error {
// drainIsDone is its own function scope so we can release the allocLock
// between poll attempts
drainIsDone := func() bool {
c.allocLock.RLock()
defer c.allocLock.RUnlock()
for _, runner := range c.allocs {
// note: allocs in runners should never be nil or have a nil Job but
// if they do we can safely assume the runner is done with it
alloc := runner.Alloc()
if alloc != nil && !alloc.ClientTerminalStatus() {
if !drainSpec.IgnoreSystemJobs {
return false
}
if alloc.Job == nil {
continue
}
if alloc.Job.Type != structs.JobTypeSystem {
return false
}
}
}
return true
}
timer, stop := helper.NewSafeTimer(0)
defer stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
if drainIsDone() {
return nil
}
timer.Reset(interval)
}
}
}