From 28bac139cb07da4a664261f32dc832aef3c6ae35 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Mon, 13 Apr 2020 16:08:24 -0400 Subject: [PATCH] client/heartbeatstop: destroy allocs when disconnected from servers - track lastHeartbeat, the client local time of the last successful heartbeat round trip - track allocations with `stop_after_client_disconnect` configured - trigger allocation destroy (which handles cleanup) - restore heartbeat/killable allocs tracking when allocs are recovered from disk - on client restart, stop those allocs after a grace period if the servers are still partioned --- client/client.go | 37 ++++++-- client/heartbeatstop.go | 163 +++++++++++++++++++++++++++++++++ client/heartbeatstop_test.go | 72 +++++++++++++++ client/state/interface.go | 9 ++ client/state/memdb.go | 17 ++++ client/state/noopdb.go | 10 ++ client/state/state_database.go | 51 +++++++++++ nomad/structs/structs.go | 4 + 8 files changed, 357 insertions(+), 6 deletions(-) create mode 100644 client/heartbeatstop.go create mode 100644 client/heartbeatstop_test.go diff --git a/client/client.go b/client/client.go index d5eb2a13a..367a601b3 100644 --- a/client/client.go +++ b/client/client.go @@ -180,10 +180,10 @@ type Client struct { servers *servers.Manager // heartbeat related times for tracking how often to heartbeat - lastHeartbeat time.Time heartbeatTTL time.Duration haveHeartbeated bool heartbeatLock sync.Mutex + heartbeatStop *heartbeatStop // triggerDiscoveryCh triggers Consul discovery; see triggerDiscovery triggerDiscoveryCh chan struct{} @@ -371,6 +371,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic }, // TODO(tgross): refactor these dispenser constructors into csimanager to tidy it up }) + // create heartbeatStop, and restore its previous state from the state store. Post init for the stateDB + c.heartbeatStop = newHeartbeatStop(c.stateDB, c.getAllocRunner, logger, c.shutdownCh) + // Setup the clients RPC server c.setupClientRpc() @@ -448,6 +451,10 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic // updates sent to the server on startup. go c.batchFirstFingerprints() + // Watch for disconnection, and heartbeatStopAllocs configured to have a maximum + // lifetime when out of touch with the server + go c.heartbeatStop.watch() + // Add the stats collector statsCollector := stats.NewHostStatsCollector(c.logger, c.config.AllocDir, c.devicemanager.AllStats) c.hostStatsCollector = statsCollector @@ -765,7 +772,7 @@ func (c *Client) Stats() map[string]map[string]string { "node_id": c.NodeID(), "known_servers": strings.Join(c.GetServers(), ","), "num_allocations": strconv.Itoa(c.NumAllocs()), - "last_heartbeat": fmt.Sprintf("%v", time.Since(c.lastHeartbeat)), + "last_heartbeat": fmt.Sprintf("%v", time.Since(c.lastHeartbeat())), "heartbeat_ttl": fmt.Sprintf("%v", c.heartbeatTTL), }, "runtime": hstats.RuntimeStats(), @@ -1113,10 +1120,21 @@ func (c *Client) restoreState() error { continue } + // Maybe mark the alloc for halt on missing server heartbeats + if c.heartbeatStop.shouldStop(alloc) { + err = c.heartbeatStop.stopAlloc(alloc.ID) + if err != nil { + c.logger.Error("error stopping alloc", "error", err, "alloc_id", alloc.ID) + } + continue + } + //XXX is this locking necessary? c.allocLock.Lock() c.allocs[alloc.ID] = ar c.allocLock.Unlock() + + c.heartbeatStop.allocHook(alloc) } // All allocs restored successfully, run them! @@ -1532,6 +1550,10 @@ func (c *Client) registerAndHeartbeat() { } } +func (c *Client) lastHeartbeat() time.Time { + return c.heartbeatStop.getLastOk() +} + // getHeartbeatRetryIntv is used to retrieve the time to wait before attempting // another heartbeat. func (c *Client) getHeartbeatRetryIntv(err error) time.Duration { @@ -1542,7 +1564,7 @@ func (c *Client) getHeartbeatRetryIntv(err error) time.Duration { // Collect the useful heartbeat info c.heartbeatLock.Lock() haveHeartbeated := c.haveHeartbeated - last := c.lastHeartbeat + last := c.lastHeartbeat() ttl := c.heartbeatTTL c.heartbeatLock.Unlock() @@ -1742,7 +1764,7 @@ func (c *Client) registerNode() error { c.heartbeatLock.Lock() defer c.heartbeatLock.Unlock() - c.lastHeartbeat = time.Now() + c.heartbeatStop.setLastOk() c.heartbeatTTL = resp.HeartbeatTTL return nil } @@ -1768,10 +1790,10 @@ func (c *Client) updateNodeStatus() error { // Update the last heartbeat and the new TTL, capturing the old values c.heartbeatLock.Lock() - last := c.lastHeartbeat + last := c.lastHeartbeat() oldTTL := c.heartbeatTTL haveHeartbeated := c.haveHeartbeated - c.lastHeartbeat = time.Now() + c.heartbeatStop.setLastOk() c.heartbeatTTL = resp.HeartbeatTTL c.haveHeartbeated = true c.heartbeatLock.Unlock() @@ -2368,6 +2390,9 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error // Store the alloc runner. c.allocs[alloc.ID] = ar + // Maybe mark the alloc for halt on missing server heartbeats + c.heartbeatStop.allocHook(alloc) + go ar.Run() return nil } diff --git a/client/heartbeatstop.go b/client/heartbeatstop.go new file mode 100644 index 000000000..3186bed70 --- /dev/null +++ b/client/heartbeatstop.go @@ -0,0 +1,163 @@ +package client + +import ( + "sync" + "time" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/state" + "github.com/hashicorp/nomad/nomad/structs" +) + +type heartbeatStop struct { + lastOk time.Time + allocInterval map[string]time.Duration + allocHookCh chan *structs.Allocation + getRunner func(string) (AllocRunner, error) + logger hclog.InterceptLogger + state state.StateDB + shutdownCh chan struct{} + lock *sync.RWMutex +} + +func newHeartbeatStop( + state state.StateDB, + getRunner func(string) (AllocRunner, error), + logger hclog.InterceptLogger, + shutdownCh chan struct{}) *heartbeatStop { + + h := &heartbeatStop{ + allocInterval: make(map[string]time.Duration), + allocHookCh: make(chan *structs.Allocation), + getRunner: getRunner, + logger: logger, + state: state, + shutdownCh: shutdownCh, + lock: &sync.RWMutex{}, + } + + if state != nil { + lastOk, err := state.GetLastHeartbeatOk() + if err == nil && !lastOk.IsZero() { + h.lastOk = lastOk + } + } + + return h +} + +// allocHook is called after (re)storing a new AllocRunner in the client. It registers the +// allocation to be stopped if the taskgroup is configured appropriately +func (h *heartbeatStop) allocHook(alloc *structs.Allocation) { + tg := allocTaskGroup(alloc) + if tg.StopAfterClientDisconnect != nil { + h.allocHookCh <- alloc + } +} + +// shouldStop is called on a restored alloc to determine if lastOk is sufficiently in the +// past that it should be prevented from restarting +func (h *heartbeatStop) shouldStop(alloc *structs.Allocation) bool { + tg := allocTaskGroup(alloc) + if tg.StopAfterClientDisconnect != nil { + now := time.Now() + return now.After(h.lastOk.Add(*tg.StopAfterClientDisconnect)) + } + return false +} + +// watch is a loop that checks for allocations that should be stopped. It also manages the +// registration of allocs to be stopped in a single thread. +func (h *heartbeatStop) watch() { + // If we never manage to successfully contact the server, we want to stop our allocs + // after duration + start time + h.lastOk = time.Now() + stop := make(chan string, 1) + var now time.Time + var interval time.Duration + checkAllocs := false + + for { + // minimize the interval + interval = 5 * time.Second + for _, t := range h.allocInterval { + if t < interval { + interval = t + } + } + + checkAllocs = false + timeout := time.After(interval) + + select { + case allocID := <-stop: + if err := h.stopAlloc(allocID); err != nil { + h.logger.Warn("stopping alloc %s on heartbeat timeout failed: %v", allocID, err) + continue + } + delete(h.allocInterval, allocID) + + case alloc := <-h.allocHookCh: + tg := allocTaskGroup(alloc) + if tg.StopAfterClientDisconnect != nil { + h.allocInterval[alloc.ID] = *tg.StopAfterClientDisconnect + } + + case <-timeout: + checkAllocs = true + + case <-h.shutdownCh: + return + } + + if !checkAllocs { + continue + } + + now = time.Now() + for allocID, d := range h.allocInterval { + if now.After(h.lastOk.Add(d)) { + stop <- allocID + } + } + } +} + +// setLastOk sets the last known good heartbeat time to the current time, and persists that time to disk +func (h *heartbeatStop) setLastOk() error { + h.lock.Lock() + defer h.lock.Unlock() + t := time.Now() + h.lastOk = t + // We may encounter an error here, but want to update the running time + // unconditionally, since we'll actively terminate stateful tasks if it ages too + // much. We only use the state value when restarting the client itself after a + // crash, so it's better to update the runtime value and have the stored value stale + return h.state.PutLastHeartbeatOk(t) +} + +func (h *heartbeatStop) getLastOk() time.Time { + h.lock.RLock() + defer h.lock.RUnlock() + return h.lastOk +} + +// stopAlloc actually stops the allocation +func (h *heartbeatStop) stopAlloc(allocID string) error { + runner, err := h.getRunner(allocID) + if err != nil { + return err + } + + runner.Destroy() + return nil +} + +func allocTaskGroup(alloc *structs.Allocation) *structs.TaskGroup { + for _, tg := range alloc.Job.TaskGroups { + if tg.Name == alloc.TaskGroup { + return tg + } + } + return nil +} diff --git a/client/heartbeatstop_test.go b/client/heartbeatstop_test.go new file mode 100644 index 000000000..257b91b44 --- /dev/null +++ b/client/heartbeatstop_test.go @@ -0,0 +1,72 @@ +package client + +import ( + "testing" + "time" + + "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" +) + +func TestHearbeatStop_allocHook(t *testing.T) { + t.Parallel() + + server, _, cleanupS1 := testServer(t, nil) + defer cleanupS1() + testutil.WaitForLeader(t, server.RPC) + + client, cleanupC1 := TestClient(t, func(c *config.Config) { + c.RPCHandler = server + }) + defer cleanupC1() + + // last heartbeat is persisted in the state db + err := client.registerNode() + require.NoError(t, err) + last, err := client.stateDB.GetLastHeartbeatOk() + require.NoError(t, err) + require.Empty(t, last) + + // an allocation, with a tiny lease + d := 1 * time.Microsecond + alloc := &structs.Allocation{ + ID: uuid.Generate(), + TaskGroup: "foo", + Job: &structs.Job{ + TaskGroups: []*structs.TaskGroup{ + { + Name: "foo", + StopAfterClientDisconnect: &d, + }, + }, + }, + Resources: &structs.Resources{ + CPU: 100, + MemoryMB: 100, + DiskMB: 0, + }, + } + + // alloc added to heartbeatStop.allocs + err = client.addAlloc(alloc, "") + require.NoError(t, err) + testutil.WaitForResult(func() (bool, error) { + _, ok := client.heartbeatStop.allocInterval[alloc.ID] + return ok, nil + }, func(err error) { + require.NoError(t, err) + }) + + // the tiny lease causes the watch loop to destroy it + testutil.WaitForResult(func() (bool, error) { + _, ok := client.heartbeatStop.allocInterval[alloc.ID] + return !ok, nil + }, func(err error) { + require.NoError(t, err) + }) + + require.Empty(t, client.allocs[alloc.ID]) +} diff --git a/client/state/interface.go b/client/state/interface.go index dc492d5ec..d4ecb1257 100644 --- a/client/state/interface.go +++ b/client/state/interface.go @@ -1,6 +1,8 @@ package state import ( + "time" + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" "github.com/hashicorp/nomad/client/dynamicplugins" @@ -76,6 +78,13 @@ type StateDB interface { // PutDynamicPluginRegistryState is used to store the dynamic plugin managers's state. PutDynamicPluginRegistryState(state *dynamicplugins.RegistryState) error + // GetLastHeartbeatOk retrieves the stored last known good heartbeat + GetLastHeartbeatOk() (time.Time, error) + + // PutLastHeartbeatOk stores the last heartbeat known to have made the round trip to + // the server + PutLastHeartbeatOk(time.Time) error + // Close the database. Unsafe for further use after calling regardless // of return value. Close() error diff --git a/client/state/memdb.go b/client/state/memdb.go index 63e967e45..46a3addce 100644 --- a/client/state/memdb.go +++ b/client/state/memdb.go @@ -2,6 +2,7 @@ package state import ( "sync" + "time" hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" @@ -33,6 +34,9 @@ type MemDB struct { // dynamicmanager -> registry-state dynamicManagerPs *dynamicplugins.RegistryState + // lastHeartbeatOk -> last_heartbeat_ok + lastHeartbeatOk time.Time + logger hclog.Logger mu sync.RWMutex @@ -89,6 +93,19 @@ func (m *MemDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentS return nil } +func (m *MemDB) GetLastHeartbeatOk() (time.Time, error) { + m.mu.Lock() + defer m.mu.Unlock() + return m.lastHeartbeatOk, nil +} + +func (m *MemDB) PutLastHeartbeatOk(t time.Time) error { + m.mu.Lock() + defer m.mu.Unlock() + m.lastHeartbeatOk = t + return nil +} + func (m *MemDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) { m.mu.RLock() defer m.mu.RUnlock() diff --git a/client/state/noopdb.go b/client/state/noopdb.go index 28fbd2c15..fbb4535fe 100644 --- a/client/state/noopdb.go +++ b/client/state/noopdb.go @@ -1,6 +1,8 @@ package state import ( + "time" + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" "github.com/hashicorp/nomad/client/dynamicplugins" @@ -79,6 +81,14 @@ func (n NoopDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState, return nil, nil } +func (n NoopDB) PutLastHeartbeatOk(t time.Time) error { + return nil +} + +func (n NoopDB) GetLastHeartbeatOk() (time.Time, error) { + return time.Time{}, nil +} + func (n NoopDB) Close() error { return nil } diff --git a/client/state/state_database.go b/client/state/state_database.go index a9a958f5f..09d33e0d4 100644 --- a/client/state/state_database.go +++ b/client/state/state_database.go @@ -91,6 +91,12 @@ var ( // registryStateKey is the key at which dynamic plugin registry state is stored registryStateKey = []byte("registry_state") + + // lastHeartbeatOkBucket is the bucket for storing the last known good heartbeat time + lastHeartbeatOkBucket = []byte("last_heartbeat_ok") + + // lastHeartbeatOkKey is the key for the last known good heartbeat time + lastHeartbeatOkKey = []byte("last_heartbeat") ) // taskBucketName returns the bucket name for the given task name. @@ -655,6 +661,51 @@ func (s *BoltStateDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryS return ps, nil } +// PutLastHeartbeatOk stores the dynamic plugin registry's +// state or returns an error. +func (s *BoltStateDB) PutLastHeartbeatOk(t time.Time) error { + return s.db.Update(func(tx *boltdd.Tx) error { + // Retrieve the root dynamic plugin manager bucket + dynamicBkt, err := tx.CreateBucketIfNotExists(lastHeartbeatOkBucket) + if err != nil { + return err + } + + return dynamicBkt.Put(lastHeartbeatOkKey, t.Unix()) + }) +} + +// GetLastHeartbeatOk stores the dynamic plugin registry's +// registry state or returns an error. +func (s *BoltStateDB) GetLastHeartbeatOk() (time.Time, error) { + var unix int64 + err := s.db.View(func(tx *boltdd.Tx) error { + dynamicBkt := tx.Bucket(lastHeartbeatOkBucket) + if dynamicBkt == nil { + // No state, return + return nil + } + + // Restore Plugin State if it exists + if err := dynamicBkt.Get(lastHeartbeatOkKey, unix); err != nil { + if !boltdd.IsErrNotFound(err) { + return fmt.Errorf("failed to read last heartbeat state: %v", err) + } + + // Key not found, reset output to nil + unix = 0 + } + + return nil + }) + + if err != nil { + return time.Time{}, err + } + + return time.Unix(unix, 0), nil +} + // init initializes metadata entries in a newly created state database. func (s *BoltStateDB) init() error { return s.db.Update(func(tx *boltdd.Tx) error { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 8f3eb060f..edef82ad9 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -5206,6 +5206,10 @@ type TaskGroup struct { // ShutdownDelay is the amount of time to wait between deregistering // group services in consul and stopping tasks. ShutdownDelay *time.Duration + + // StopAfterClientDisconnect, if set, configures the client to stop the task group + // after this duration since the last known good heartbeat + StopAfterClientDisconnect *time.Duration } func (tg *TaskGroup) Copy() *TaskGroup {