boltDB database for client state

This commit is contained in:
Alex Dadgar 2017-04-29 15:43:23 -07:00
parent bddedd7aba
commit b94f855326
4 changed files with 185 additions and 25 deletions

View File

@ -8,6 +8,7 @@ import (
"sync"
"time"
"github.com/boltdb/bolt"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
@ -27,6 +28,13 @@ const (
taskReceivedSyncLimit = 30 * time.Second
)
var (
// The following are the key paths written to the state database
allocRunnerStateImmutableKey = []byte("immutable")
allocRunnerStateMutableKey = []byte("mutable")
allocRunnerStateAllocDirKey = []byte("alloc-dir")
)
// AllocStateUpdater is used to update the status of an allocation
type AllocStateUpdater func(alloc *structs.Allocation)
@ -69,8 +77,15 @@ type AllocRunner struct {
destroyLock sync.Mutex
waitCh chan struct{}
// serialize saveAllocRunnerState calls
persistLock sync.Mutex
// State related fields
// stateDB is used to store the alloc runners state
stateDB *bolt.DB
// immutablePersisted and allocDirPersisted are used to track whether the
// immutable data and the alloc dir have been persisted. Once persisted we
// can lower write volume by not re-writing these values
immutablePersisted bool
allocDirPersisted bool
}
// allocRunnerState is used to snapshot the state of the alloc runner
@ -95,13 +110,29 @@ type allocRunnerState struct {
} `json:"Context,omitempty"`
}
// allocRunnerImmutableState is state that only has to be written once as it
// doesn't change over the life-cycle of the alloc_runner.
type allocRunnerImmutableState struct {
Version string
Alloc *structs.Allocation
}
// allocRunnerMutableState is state that has to be written on each save as it
// changes over the life-cycle of the alloc_runner.
type allocRunnerMutableState struct {
AllocClientStatus string
AllocClientDescription string
TaskStates map[string]*structs.TaskState
}
// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater,
func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater AllocStateUpdater,
alloc *structs.Allocation, vaultClient vaultclient.VaultClient,
consulClient ConsulServiceAPI) *AllocRunner {
ar := &AllocRunner{
config: config,
stateDB: stateDB,
updater: updater,
logger: logger,
alloc: alloc,
@ -144,6 +175,8 @@ func (r *AllocRunner) RestoreState() error {
}
}
// XXX needs to be updated and handle the upgrade path
// Restore fields
r.alloc = snap.Alloc
r.allocDir = snap.AllocDir
@ -178,7 +211,7 @@ func (r *AllocRunner) RestoreState() error {
}
task := &structs.Task{Name: name}
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient)
tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient)
r.tasks[name] = tr
// Skip tasks in terminal states.
@ -224,10 +257,19 @@ func (r *AllocRunner) SaveState() error {
}
func (r *AllocRunner) saveAllocRunnerState() error {
r.persistLock.Lock()
defer r.persistLock.Unlock()
// Start the transaction.
tx, err := r.stateDB.Begin(true)
if err != nil {
return err
}
// Create the snapshot.
// Grab the allocation bucket
allocBkt, err := getAllocationBucket(tx, r.alloc.ID)
if err != nil {
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
}
// Grab all the relevant data
alloc := r.Alloc()
r.allocLock.Lock()
@ -239,14 +281,45 @@ func (r *AllocRunner) saveAllocRunnerState() error {
allocDir := r.allocDir
r.allocDirLock.Unlock()
snap := allocRunnerState{
Version: r.config.Version,
Alloc: alloc,
AllocDir: allocDir,
// Write the immutable data
if !r.immutablePersisted {
immutable := &allocRunnerImmutableState{
Alloc: alloc,
Version: r.config.Version,
}
if err := putObject(allocBkt, allocRunnerStateImmutableKey, &immutable); err != nil {
return fmt.Errorf("failed to write alloc_runner immutable state: %v", err)
}
tx.OnCommit(func() {
r.immutablePersisted = true
})
}
// Write the alloc dir data if it hasn't been written before and it exists.
if !r.allocDirPersisted && r.allocDir != nil {
if err := putObject(allocBkt, allocRunnerStateAllocDirKey, allocDir); err != nil {
return fmt.Errorf("failed to write alloc_runner allocDir state: %v", err)
}
tx.OnCommit(func() {
r.allocDirPersisted = true
})
}
// Write the mutable state every time
mutable := &allocRunnerMutableState{
AllocClientStatus: allocClientStatus,
AllocClientDescription: allocClientDescription,
TaskStates: alloc.TaskStates,
}
return persistState(r.stateFilePath(), &snap)
if err := putObject(allocBkt, allocRunnerStateMutableKey, &mutable); err != nil {
return fmt.Errorf("failed to write alloc_runner mutable state: %v", err)
}
return tx.Commit()
}
func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error {
@ -525,7 +598,7 @@ func (r *AllocRunner) Run() {
taskdir := r.allocDir.NewTaskDir(task.Name)
r.allocDirLock.Unlock()
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient, r.consulClient)
tr := NewTaskRunner(r.logger, r.config, r.stateDB, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient, r.consulClient)
r.tasks[task.Name] = tr
tr.MarkReceived()

View File

@ -16,6 +16,7 @@ import (
"time"
"github.com/armon/go-metrics"
"github.com/boltdb/bolt"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-multierror"
@ -99,6 +100,9 @@ type Client struct {
config *config.Config
start time.Time
// stateDB is used to efficiently store client state.
stateDB *bolt.DB
// configCopy is a copy that should be passed to alloc-runners.
configCopy *config.Config
configLock sync.RWMutex
@ -340,6 +344,13 @@ func (c *Client) init() error {
}
c.logger.Printf("[INFO] client: using state directory %v", c.config.StateDir)
// Create or open the state database
db, err := bolt.Open(filepath.Join(c.config.StateDir, "state.db"), 0600, nil)
if err != nil {
return fmt.Errorf("failed to create state database", err)
}
c.stateDB = db
// Ensure the alloc dir exists if we have one
if c.config.AllocDir != "" {
if err := os.MkdirAll(c.config.AllocDir, 0755); err != nil {
@ -410,6 +421,13 @@ func (c *Client) Shutdown() error {
return nil
}
// Defer closing the database
defer func() {
if err := c.stateDB.Close(); err != nil {
c.logger.Printf("[ERR] client: failed to close state database on shutdown: %v", err)
}
}()
// Stop renewing tokens and secrets
if c.vaultClient != nil {
c.vaultClient.Stop()
@ -590,6 +608,8 @@ func (c *Client) restoreState() error {
return nil
}
// XXX Needs to be updated and handle the upgrade case
// Scan the directory
list, err := ioutil.ReadDir(filepath.Join(c.config.StateDir, "alloc"))
if err != nil && os.IsNotExist(err) {
@ -604,7 +624,7 @@ func (c *Client) restoreState() error {
id := entry.Name()
alloc := &structs.Allocation{ID: id}
c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient, c.consulService)
ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService)
c.configLock.RUnlock()
c.allocLock.Lock()
c.allocs[id] = ar
@ -1892,7 +1912,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.Allo
defer c.allocLock.Unlock()
c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient, c.consulService)
ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService)
ar.SetPreviousAllocDir(prevAllocDir)
c.configLock.RUnlock()
go ar.Run()

View File

@ -2,8 +2,10 @@ package driver
import (
"context"
"crypto/md5"
"errors"
"fmt"
"io"
"log"
"os"
"strings"
@ -150,6 +152,20 @@ func (r *CreatedResources) Merge(o *CreatedResources) {
}
}
func (r *CreatedResources) Hash() []byte {
h := md5.New()
for k, values := range r.Resources {
io.WriteString(h, k)
io.WriteString(h, "values")
for i, v := range values {
io.WriteString(h, fmt.Sprintf("%d-%v", i, v))
}
}
return h.Sum(nil)
}
// Driver is used for execution of tasks. This allows Nomad
// to support many pluggable implementations of task drivers.
// Examples could include LXC, Docker, Qemu, etc.

View File

@ -1,9 +1,11 @@
package client
import (
"bytes"
"crypto/md5"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"log"
"os"
@ -13,6 +15,7 @@ import (
"time"
"github.com/armon/go-metrics"
"github.com/boltdb/bolt"
"github.com/golang/snappy"
"github.com/hashicorp/consul-template/signals"
"github.com/hashicorp/go-multierror"
@ -54,8 +57,15 @@ const (
vaultTokenFile = "vault_token"
)
var (
// taskRunnerStateAllKey holds all the task runners state. At the moment
// there is no need to split it
taskRunnerStateAllKey = []byte("simple-all")
)
// TaskRunner is used to wrap a task within an allocation and provide the execution context.
type TaskRunner struct {
stateDB *bolt.DB
config *config.Config
updater TaskStateUpdater
logger *log.Logger
@ -142,17 +152,33 @@ type TaskRunner struct {
// AllocRunner, so all state fields must be synchronized using this
// lock.
persistLock sync.Mutex
// persistedHash is the hash of the last persisted snapshot. It is used to
// detect if a new snapshot has to be writen to disk.
persistedHash []byte
}
// taskRunnerState is used to snapshot the state of the task runner
type taskRunnerState struct {
Version string
Task *structs.Task
HandleID string
ArtifactDownloaded bool
TaskDirBuilt bool
CreatedResources *driver.CreatedResources
PayloadRendered bool
CreatedResources *driver.CreatedResources
}
func (s *taskRunnerState) Hash() []byte {
h := md5.New()
io.WriteString(h, s.Version)
io.WriteString(h, s.HandleID)
io.WriteString(h, fmt.Sprintf("%v", s.ArtifactDownloaded))
io.WriteString(h, fmt.Sprintf("%v", s.TaskDirBuilt))
io.WriteString(h, fmt.Sprintf("%v", s.PayloadRendered))
h.Write(s.CreatedResources.Hash())
return h.Sum(nil)
}
// TaskStateUpdater is used to signal that tasks state has changed.
@ -172,7 +198,7 @@ type SignalEvent struct {
// NewTaskRunner is used to create a new task context
func NewTaskRunner(logger *log.Logger, config *config.Config,
updater TaskStateUpdater, taskDir *allocdir.TaskDir,
stateDB *bolt.DB, updater TaskStateUpdater, taskDir *allocdir.TaskDir,
alloc *structs.Allocation, task *structs.Task,
vaultClient vaultclient.VaultClient, consulClient ConsulServiceAPI) *TaskRunner {
@ -189,6 +215,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
tc := &TaskRunner{
config: config,
stateDB: stateDB,
updater: updater,
logger: logger,
restartTracker: restartTracker,
@ -236,6 +263,8 @@ func (r *TaskRunner) stateFilePath() string {
// RestoreState is used to restore our state
func (r *TaskRunner) RestoreState() error {
// XXX needs to be updated and handle the upgrade path
// Load the snapshot
var snap taskRunnerState
if err := restoreState(r.stateFilePath(), &snap); err != nil {
@ -243,11 +272,6 @@ func (r *TaskRunner) RestoreState() error {
}
// Restore fields
if snap.Task == nil {
return fmt.Errorf("task runner snapshot includes nil Task")
} else {
r.task = snap.Task
}
r.artifactsDownloaded = snap.ArtifactDownloaded
r.taskDirBuilt = snap.TaskDirBuilt
r.payloadRendered = snap.PayloadRendered
@ -313,11 +337,11 @@ func (r *TaskRunner) RestoreState() error {
// SaveState is used to snapshot our state
func (r *TaskRunner) SaveState() error {
// XXX needs to be updated
r.persistLock.Lock()
defer r.persistLock.Unlock()
snap := taskRunnerState{
Task: r.task,
Version: r.config.Version,
ArtifactDownloaded: r.artifactsDownloaded,
TaskDirBuilt: r.taskDirBuilt,
@ -330,7 +354,34 @@ func (r *TaskRunner) SaveState() error {
snap.HandleID = r.handle.ID()
}
r.handleLock.Unlock()
return persistState(r.stateFilePath(), &snap)
h := snap.Hash()
if bytes.Equal(h, r.persistedHash) {
return nil
}
// Start the transaction.
tx, err := r.stateDB.Begin(true)
if err != nil {
return err
}
// Grab the task bucket
taskBkt, err := getTaskBucket(tx, r.alloc.ID, r.task.Name)
if err != nil {
return fmt.Errorf("failed to retrieve allocation bucket: %v", err)
}
if err := putObject(taskBkt, taskRunnerStateAllKey, &snap); err != nil {
return fmt.Errorf("failed to write task_runner state: %v", err)
}
// Store the hash that was persisted
tx.OnCommit(func() {
r.persistedHash = h
})
return tx.Commit()
}
// DestroyState is used to cleanup after ourselves