Getting snapshot of allocation from remote node (#1741)
* Added the alloc dir move * Moving allocdirs when starting allocations * Added the migrate flag to ephemeral disk * Stopping migration if the allocation doesn't need migration any more * Added the GetAllocDir method * refactored code * Added a test for alloc runner * Incorporated review comments
This commit is contained in:
parent
0b433e4c73
commit
d50c395421
|
@ -84,8 +84,9 @@ type Service struct {
|
|||
|
||||
// EphemeralDisk is an ephemeral disk object
|
||||
type EphemeralDisk struct {
|
||||
Sticky bool
|
||||
SizeMB int `mapstructure:"size"`
|
||||
Sticky bool
|
||||
Migrate bool
|
||||
SizeMB int `mapstructure:"size"`
|
||||
}
|
||||
|
||||
// TaskGroup is the unit of scheduling.
|
||||
|
|
|
@ -71,6 +71,8 @@ type AllocRunner struct {
|
|||
vaultClient vaultclient.VaultClient
|
||||
vaultTokens map[string]vaultToken
|
||||
|
||||
otherAllocDir *allocdir.AllocDir
|
||||
|
||||
destroy bool
|
||||
destroyCh chan struct{}
|
||||
destroyLock sync.Mutex
|
||||
|
@ -192,6 +194,14 @@ func (r *AllocRunner) RestoreState() error {
|
|||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// GetAllocDir returns the alloc dir for the alloc runner
|
||||
func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir {
|
||||
if r.ctx == nil {
|
||||
return nil
|
||||
}
|
||||
return r.ctx.AllocDir
|
||||
}
|
||||
|
||||
// SaveState is used to snapshot the state of the alloc runner
|
||||
// if the fullSync is marked as false only the state of the Alloc Runner
|
||||
// is snapshotted. If fullSync is marked as true, we snapshot
|
||||
|
@ -436,6 +446,14 @@ func (r *AllocRunner) Run() {
|
|||
return
|
||||
}
|
||||
r.ctx = driver.NewExecContext(allocDir, r.alloc.ID)
|
||||
if r.otherAllocDir != nil {
|
||||
if err := allocDir.Move(r.otherAllocDir, tg.Tasks); err != nil {
|
||||
r.logger.Printf("[ERROR] client: failed to move alloc dir into alloc %q: %v", r.alloc.ID, err)
|
||||
}
|
||||
if err := r.otherAllocDir.Destroy(); err != nil {
|
||||
r.logger.Printf("[ERROR] client: error destroying allocdir %v", r.otherAllocDir.AllocDir, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
r.ctxLock.Unlock()
|
||||
|
||||
|
@ -533,6 +551,12 @@ OUTER:
|
|||
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
|
||||
}
|
||||
|
||||
// SetPreviousAllocDir sets the previous allocation directory of the current
|
||||
// allocation
|
||||
func (r *AllocRunner) SetPreviousAllocDir(allocDir *allocdir.AllocDir) {
|
||||
r.otherAllocDir = allocDir
|
||||
}
|
||||
|
||||
// destroyTaskRunners destroys the task runners, waits for them to terminate and
|
||||
// then saves state.
|
||||
func (r *AllocRunner) destroyTaskRunners(destroyEvent *structs.TaskEvent) {
|
||||
|
|
|
@ -861,3 +861,71 @@ func TestAllocRunner_SaveRestoreState_VaultTokens_Invalid(t *testing.T) {
|
|||
t.Fatalf("took too long to terminate")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllocRunner_MoveAllocDir(t *testing.T) {
|
||||
// Create an alloc runner
|
||||
alloc := mock.Alloc()
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"run_for": "1s",
|
||||
}
|
||||
upd, ar := testAllocRunnerFromAlloc(alloc, false)
|
||||
go ar.Run()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if upd.Count == 0 {
|
||||
return false, fmt.Errorf("No updates")
|
||||
}
|
||||
last := upd.Allocs[upd.Count-1]
|
||||
if last.ClientStatus != structs.AllocClientStatusComplete {
|
||||
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
||||
// Write some data in data dir and task dir of the alloc
|
||||
dataFile := filepath.Join(ar.ctx.AllocDir.SharedDir, "data", "data_file")
|
||||
ioutil.WriteFile(dataFile, []byte("hello world"), os.ModePerm)
|
||||
taskDir := ar.ctx.AllocDir.TaskDirs[task.Name]
|
||||
taskLocalFile := filepath.Join(taskDir, "local", "local_file")
|
||||
ioutil.WriteFile(taskLocalFile, []byte("good bye world"), os.ModePerm)
|
||||
|
||||
// Create another alloc runner
|
||||
alloc1 := mock.Alloc()
|
||||
task = alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"run_for": "1s",
|
||||
}
|
||||
upd1, ar1 := testAllocRunnerFromAlloc(alloc1, false)
|
||||
ar1.SetPreviousAllocDir(ar.ctx.AllocDir)
|
||||
go ar1.Run()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if upd1.Count == 0 {
|
||||
return false, fmt.Errorf("No updates")
|
||||
}
|
||||
last := upd1.Allocs[upd1.Count-1]
|
||||
if last.ClientStatus != structs.AllocClientStatusComplete {
|
||||
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
||||
// Ensure that data from ar1 was moved to ar
|
||||
taskDir = ar1.ctx.AllocDir.TaskDirs[task.Name]
|
||||
taskLocalFile = filepath.Join(taskDir, "local", "local_file")
|
||||
if fileInfo, _ := os.Stat(taskLocalFile); fileInfo == nil {
|
||||
t.Fatalf("file %v not found", taskLocalFile)
|
||||
}
|
||||
|
||||
dataFile = filepath.Join(ar1.ctx.AllocDir.SharedDir, "data", "data_file")
|
||||
if fileInfo, _ := os.Stat(dataFile); fileInfo == nil {
|
||||
t.Fatalf("file %v not found", dataFile)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -190,6 +190,34 @@ func (d *AllocDir) Snapshot(w io.Writer) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Move moves the shared data and task local dirs
|
||||
func (d *AllocDir) Move(other *AllocDir, tasks []*structs.Task) error {
|
||||
// Move the data directory
|
||||
otherDataDir := filepath.Join(other.SharedDir, "data")
|
||||
dataDir := filepath.Join(d.SharedDir, "data")
|
||||
if fileInfo, err := os.Stat(otherDataDir); fileInfo != nil && err == nil {
|
||||
if err := os.Rename(otherDataDir, dataDir); err != nil {
|
||||
return fmt.Errorf("error moving data dir: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Move the task directories
|
||||
for _, task := range tasks {
|
||||
taskDir := filepath.Join(other.AllocDir, task.Name)
|
||||
otherTaskLocal := filepath.Join(taskDir, TaskLocal)
|
||||
|
||||
if fileInfo, err := os.Stat(otherTaskLocal); fileInfo != nil && err == nil {
|
||||
if taskDir, ok := d.TaskDirs[task.Name]; ok {
|
||||
if err := os.Rename(otherTaskLocal, filepath.Join(taskDir, TaskLocal)); err != nil {
|
||||
return fmt.Errorf("error moving task local dir: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Tears down previously build directory structure.
|
||||
func (d *AllocDir) Destroy() error {
|
||||
|
||||
|
|
|
@ -235,7 +235,6 @@ func TestAllocDir_Snapshot(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("Couldn't create temp dir: %v", err)
|
||||
}
|
||||
|
||||
defer os.RemoveAll(tmp)
|
||||
|
||||
d := NewAllocDir(tmp, structs.DefaultResources().DiskMB)
|
||||
|
@ -288,3 +287,61 @@ func TestAllocDir_Snapshot(t *testing.T) {
|
|||
t.Fatalf("bad files: %#v", files)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllocDir_Move(t *testing.T) {
|
||||
tmp, err := ioutil.TempDir("", "AllocDir")
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmp)
|
||||
|
||||
// Create two alloc dirs
|
||||
d1 := NewAllocDir(tmp, structs.DefaultResources().DiskMB)
|
||||
defer d1.Destroy()
|
||||
|
||||
d2 := NewAllocDir(tmp, structs.DefaultResources().DiskMB)
|
||||
defer d2.Destroy()
|
||||
|
||||
tasks := []*structs.Task{t1, t2}
|
||||
if err := d1.Build(tasks); err != nil {
|
||||
t.Fatalf("Build(%v) failed: %v", tasks, err)
|
||||
}
|
||||
|
||||
if err := d2.Build(tasks); err != nil {
|
||||
t.Fatalf("Build(%v) failed: %v", tasks, err)
|
||||
}
|
||||
|
||||
dataDir := filepath.Join(d1.SharedDir, "data")
|
||||
taskDir := d1.TaskDirs[t1.Name]
|
||||
taskLocal := filepath.Join(taskDir, "local")
|
||||
|
||||
// Write a file to the shared dir.
|
||||
exp := []byte{'f', 'o', 'o'}
|
||||
file := "bar"
|
||||
if err := ioutil.WriteFile(filepath.Join(dataDir, file), exp, 0777); err != nil {
|
||||
t.Fatalf("Couldn't write file to shared directory: %v", err)
|
||||
}
|
||||
|
||||
// Write a file to the task local
|
||||
exp = []byte{'b', 'a', 'r'}
|
||||
file1 := "lol"
|
||||
if err := ioutil.WriteFile(filepath.Join(taskLocal, file1), exp, 0777); err != nil {
|
||||
t.Fatalf("couldn't write to task local directory: %v", err)
|
||||
}
|
||||
|
||||
// Move the d1 allocdir to d2
|
||||
if err := d2.Move(d1, []*structs.Task{t1, t2}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Ensure the files in d1 are present in d2
|
||||
fi, err := os.Stat(filepath.Join(d2.SharedDir, "data", "bar"))
|
||||
if err != nil || fi == nil {
|
||||
t.Fatalf("data dir was not moved")
|
||||
}
|
||||
|
||||
fi, err = os.Stat(filepath.Join(d2.TaskDirs[t1.Name], "local", "lol"))
|
||||
if err != nil || fi == nil {
|
||||
t.Fatalf("task local dir was not moved")
|
||||
}
|
||||
}
|
||||
|
|
310
client/client.go
310
client/client.go
|
@ -1,11 +1,14 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
@ -146,6 +149,10 @@ type Client struct {
|
|||
|
||||
// vaultClient is used to interact with Vault for token and secret renewals
|
||||
vaultClient vaultclient.VaultClient
|
||||
|
||||
// migratingAllocs is the set of allocs whose data migration is in flight
|
||||
migratingAllocs map[string]chan struct{}
|
||||
migratingAllocsLock sync.Mutex
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -159,19 +166,18 @@ var (
|
|||
func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logger) (*Client, error) {
|
||||
// Create the client
|
||||
c := &Client{
|
||||
config: cfg,
|
||||
consulSyncer: consulSyncer,
|
||||
start: time.Now(),
|
||||
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
|
||||
servers: newServerList(),
|
||||
triggerDiscoveryCh: make(chan struct{}),
|
||||
serversDiscoveredCh: make(chan struct{}),
|
||||
logger: logger,
|
||||
hostStatsCollector: stats.NewHostStatsCollector(),
|
||||
allocs: make(map[string]*AllocRunner),
|
||||
blockedAllocations: make(map[string]*structs.Allocation),
|
||||
allocUpdates: make(chan *structs.Allocation, 64),
|
||||
shutdownCh: make(chan struct{}),
|
||||
config: cfg,
|
||||
consulSyncer: consulSyncer,
|
||||
start: time.Now(),
|
||||
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
|
||||
logger: logger,
|
||||
hostStatsCollector: stats.NewHostStatsCollector(),
|
||||
allocs: make(map[string]*AllocRunner),
|
||||
blockedAllocations: make(map[string]*structs.Allocation),
|
||||
allocUpdates: make(chan *structs.Allocation, 64),
|
||||
shutdownCh: make(chan struct{}),
|
||||
migratingAllocs: make(map[string]chan struct{}),
|
||||
servers: newServerList(),
|
||||
}
|
||||
|
||||
// Initialize the client
|
||||
|
@ -441,7 +447,7 @@ func (c *Client) GetAllocFS(allocID string) (allocdir.AllocDirFS, error) {
|
|||
if !ok {
|
||||
return nil, fmt.Errorf("alloc not found")
|
||||
}
|
||||
return ar.ctx.AllocDir, nil
|
||||
return ar.GetAllocDir(), nil
|
||||
}
|
||||
|
||||
// GetServers returns the list of nomad servers this client is aware of.
|
||||
|
@ -1056,7 +1062,11 @@ func (c *Client) allocSync() {
|
|||
// terminal state then start the blocked allocation
|
||||
c.blockedAllocsLock.Lock()
|
||||
if blockedAlloc, ok := c.blockedAllocations[alloc.ID]; ok && alloc.Terminated() {
|
||||
if err := c.addAlloc(blockedAlloc); err != nil {
|
||||
var prevAllocDir *allocdir.AllocDir
|
||||
if ar, ok := c.getAllocRunners()[alloc.ID]; ok {
|
||||
prevAllocDir = ar.GetAllocDir()
|
||||
}
|
||||
if err := c.addAlloc(blockedAlloc, prevAllocDir); err != nil {
|
||||
c.logger.Printf("[ERR] client: failed to add alloc which was previously blocked %q: %v",
|
||||
blockedAlloc.ID, err)
|
||||
}
|
||||
|
@ -1297,13 +1307,26 @@ func (c *Client) runAllocs(update *allocUpdates) {
|
|||
c.logger.Printf("[ERR] client: failed to update alloc '%s': %v",
|
||||
update.exist.ID, err)
|
||||
}
|
||||
|
||||
// See if the updated alloc is getting migrated
|
||||
c.migratingAllocsLock.Lock()
|
||||
ch, ok := c.migratingAllocs[update.updated.ID]
|
||||
c.migratingAllocsLock.Unlock()
|
||||
if ok {
|
||||
// Stopping the migration if the allocation doesn't need any
|
||||
// migration
|
||||
if !update.updated.ShouldMigrate() {
|
||||
close(ch)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Start the new allocations
|
||||
for _, add := range diff.added {
|
||||
// If the allocation is chanined and the previous allocation hasn't
|
||||
// If the allocation is chained and the previous allocation hasn't
|
||||
// terminated yet, then add the alloc to the blocked queue.
|
||||
if ar, ok := c.getAllocRunners()[add.PreviousAllocation]; ok && !ar.Alloc().Terminated() {
|
||||
ar, ok := c.getAllocRunners()[add.PreviousAllocation]
|
||||
if ok && !ar.Alloc().Terminated() {
|
||||
c.logger.Printf("[DEBUG] client: added alloc %q to blocked queue", add.ID)
|
||||
c.blockedAllocsLock.Lock()
|
||||
c.blockedAllocations[add.PreviousAllocation] = add
|
||||
|
@ -1311,7 +1334,25 @@ func (c *Client) runAllocs(update *allocUpdates) {
|
|||
continue
|
||||
}
|
||||
|
||||
if err := c.addAlloc(add); err != nil {
|
||||
// This means the allocation has a previous allocation on another node
|
||||
// so we will block for the previous allocation to complete
|
||||
if add.PreviousAllocation != "" && !ok {
|
||||
c.migratingAllocsLock.Lock()
|
||||
c.migratingAllocs[add.ID] = make(chan struct{})
|
||||
c.migratingAllocsLock.Unlock()
|
||||
go c.blockForRemoteAlloc(add)
|
||||
continue
|
||||
}
|
||||
|
||||
// Setting the previous allocdir if the allocation had a terminal
|
||||
// previous allocation
|
||||
var prevAllocDir *allocdir.AllocDir
|
||||
tg := add.Job.LookupTaskGroup(add.TaskGroup)
|
||||
if tg != nil && tg.EphemeralDisk.Sticky == true && ar != nil {
|
||||
prevAllocDir = ar.GetAllocDir()
|
||||
}
|
||||
|
||||
if err := c.addAlloc(add, prevAllocDir); err != nil {
|
||||
c.logger.Printf("[ERR] client: failed to add alloc '%s': %v",
|
||||
add.ID, err)
|
||||
}
|
||||
|
@ -1323,6 +1364,236 @@ func (c *Client) runAllocs(update *allocUpdates) {
|
|||
}
|
||||
}
|
||||
|
||||
// blockForRemoteAlloc blocks until the previous allocation of an allocation has
|
||||
// been terminated and migrates the snapshot data
|
||||
func (c *Client) blockForRemoteAlloc(alloc *structs.Allocation) {
|
||||
c.logger.Printf("[DEBUG] client: blocking alloc %q for previous allocation %q", alloc.ID, alloc.PreviousAllocation)
|
||||
|
||||
// Removing the allocation from the set of allocs which are currently
|
||||
// undergoing migration
|
||||
defer func() {
|
||||
c.migratingAllocsLock.Lock()
|
||||
delete(c.migratingAllocs, alloc.ID)
|
||||
c.migratingAllocsLock.Unlock()
|
||||
}()
|
||||
|
||||
// Block until the previous allocation migrates to terminal state
|
||||
prevAlloc, err := c.waitForAllocTerminal(alloc.PreviousAllocation)
|
||||
if err != nil {
|
||||
c.logger.Printf("[ERR] client: error waiting for allocation %q: %v", alloc.PreviousAllocation, err)
|
||||
}
|
||||
|
||||
// Migrate the data from the remote node
|
||||
prevAllocDir, err := c.migrateRemoteAllocDir(prevAlloc, alloc.ID)
|
||||
if err != nil {
|
||||
c.logger.Printf("[ERR] client: error migrating data from remote alloc %q: %v", alloc.PreviousAllocation, err)
|
||||
}
|
||||
|
||||
// Add the allocation
|
||||
if err := c.addAlloc(alloc, prevAllocDir); err != nil {
|
||||
c.logger.Printf("[ERR] client: error adding alloc: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// waitForAllocTerminal waits for an allocation with the given alloc id to
|
||||
// transition to terminal state and blocks the caller until then.
|
||||
func (c *Client) waitForAllocTerminal(allocID string) (*structs.Allocation, error) {
|
||||
req := structs.AllocSpecificRequest{
|
||||
AllocID: allocID,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: c.Region(),
|
||||
AllowStale: true,
|
||||
},
|
||||
}
|
||||
|
||||
for {
|
||||
resp := structs.SingleAllocResponse{}
|
||||
err := c.RPC("Alloc.GetAlloc", &req, &resp)
|
||||
if err != nil {
|
||||
c.logger.Printf("[ERR] client: failed to query allocation %q: %v", allocID, err)
|
||||
retry := c.retryIntv(getAllocRetryIntv)
|
||||
select {
|
||||
case <-time.After(retry):
|
||||
continue
|
||||
case <-c.shutdownCh:
|
||||
return nil, fmt.Errorf("aborting because client is shutting down")
|
||||
}
|
||||
}
|
||||
if resp.Alloc == nil {
|
||||
return nil, nil
|
||||
}
|
||||
if resp.Alloc.Terminated() {
|
||||
return resp.Alloc, nil
|
||||
}
|
||||
|
||||
// Update the query index.
|
||||
if resp.Index > req.MinQueryIndex {
|
||||
req.MinQueryIndex = resp.Index
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// migrateRemoteAllocDir migrates the allocation directory from a remote node to
|
||||
// the current node
|
||||
func (c *Client) migrateRemoteAllocDir(alloc *structs.Allocation, allocID string) (*allocdir.AllocDir, error) {
|
||||
if alloc == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
if tg == nil {
|
||||
return nil, fmt.Errorf("Task Group %q not found in job %q", tg.Name, alloc.Job.ID)
|
||||
}
|
||||
|
||||
// Skip migration of data if the ephemeral disk is not sticky or
|
||||
// migration is turned off.
|
||||
if !tg.EphemeralDisk.Sticky || !tg.EphemeralDisk.Migrate {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
node, err := c.getNode(alloc.NodeID)
|
||||
|
||||
// If the node is down then skip migrating the data
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error retreiving node %v: %v", alloc.NodeID, err)
|
||||
}
|
||||
|
||||
// Check if node is nil
|
||||
if node == nil {
|
||||
return nil, fmt.Errorf("node %q doesn't exist", alloc.NodeID)
|
||||
}
|
||||
|
||||
// skip migration if the remote node is down
|
||||
if node.Status == structs.NodeStatusDown {
|
||||
c.logger.Printf("[INFO] client: not migrating data from alloc %q since node %q is down", alloc.ID, alloc.NodeID)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Create the previous alloc dir
|
||||
pathToAllocDir := filepath.Join(c.config.AllocDir, alloc.ID)
|
||||
if err := os.MkdirAll(pathToAllocDir, 0777); err != nil {
|
||||
c.logger.Printf("[ERR] client: error creating previous allocation dir: %v", err)
|
||||
}
|
||||
|
||||
// Get the snapshot
|
||||
url := fmt.Sprintf("http://%v/v1/client/allocation/%v/snapshot", node.HTTPAddr, alloc.ID)
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
os.RemoveAll(pathToAllocDir)
|
||||
c.logger.Printf("[ERR] client: error getting snapshot: %v", err)
|
||||
return nil, fmt.Errorf("error getting snapshot for alloc %v: %v", alloc.ID, err)
|
||||
}
|
||||
tr := tar.NewReader(resp.Body)
|
||||
defer resp.Body.Close()
|
||||
|
||||
buf := make([]byte, 1024)
|
||||
|
||||
stopMigrating, ok := c.migratingAllocs[allocID]
|
||||
if !ok {
|
||||
os.RemoveAll(pathToAllocDir)
|
||||
return nil, fmt.Errorf("couldn't find a migration validity notifier for alloc: %v", alloc.ID)
|
||||
}
|
||||
for {
|
||||
// See if the alloc still needs migration
|
||||
select {
|
||||
case <-stopMigrating:
|
||||
os.RemoveAll(pathToAllocDir)
|
||||
c.logger.Printf("[INFO] client: stopping migration of allocdir for alloc: %v", alloc.ID)
|
||||
return nil, nil
|
||||
case <-c.shutdownCh:
|
||||
os.RemoveAll(pathToAllocDir)
|
||||
c.logger.Printf("[INFO] client: stopping migration of alloc %q since client is shutting down", alloc.ID)
|
||||
return nil, nil
|
||||
default:
|
||||
}
|
||||
|
||||
// Get the next header
|
||||
hdr, err := tr.Next()
|
||||
|
||||
// If the snapshot has ended then we create the previous
|
||||
// allocdir
|
||||
if err == io.EOF {
|
||||
prevAllocDir := allocdir.NewAllocDir(pathToAllocDir, 0)
|
||||
return prevAllocDir, nil
|
||||
}
|
||||
// If there is an error then we avoid creating the alloc dir
|
||||
if err != nil {
|
||||
os.RemoveAll(pathToAllocDir)
|
||||
return nil, fmt.Errorf("error creating alloc dir for alloc %q: %v", alloc.ID, err)
|
||||
}
|
||||
|
||||
// If the header is for a directory we create the directory
|
||||
if hdr.Typeflag == tar.TypeDir {
|
||||
os.MkdirAll(filepath.Join(pathToAllocDir, hdr.Name), 0777)
|
||||
continue
|
||||
}
|
||||
// If the header is a file, we write to a file
|
||||
if hdr.Typeflag == tar.TypeReg {
|
||||
f, err := os.Create(filepath.Join(pathToAllocDir, hdr.Name))
|
||||
if err != nil {
|
||||
c.logger.Printf("[ERR] client: error creating file: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// We write in chunks of 32 bytes so that we can test if
|
||||
// the client is still alive
|
||||
for {
|
||||
if c.shutdown {
|
||||
f.Close()
|
||||
os.RemoveAll(pathToAllocDir)
|
||||
c.logger.Printf("[INFO] client: stopping migration of alloc %q because client is shutting down", alloc.ID)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
n, err := tr.Read(buf)
|
||||
if err != nil {
|
||||
f.Close()
|
||||
if err != io.EOF {
|
||||
return nil, fmt.Errorf("error reading snapshot: %v", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
if _, err := f.Write(buf[:n]); err != nil {
|
||||
f.Close()
|
||||
os.RemoveAll(pathToAllocDir)
|
||||
return nil, fmt.Errorf("error writing to file %q: %v", f.Name(), err)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getNode gets the node from the server with the given Node ID
|
||||
func (c *Client) getNode(nodeID string) (*structs.Node, error) {
|
||||
req := structs.NodeSpecificRequest{
|
||||
NodeID: nodeID,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: c.Region(),
|
||||
AllowStale: true,
|
||||
},
|
||||
}
|
||||
|
||||
resp := structs.SingleNodeResponse{}
|
||||
for {
|
||||
err := c.RPC("Node.GetNode", &req, &resp)
|
||||
if err != nil {
|
||||
c.logger.Printf("[ERR] client: failed to query node info %q: %v", nodeID, err)
|
||||
retry := c.retryIntv(getAllocRetryIntv)
|
||||
select {
|
||||
case <-time.After(retry):
|
||||
continue
|
||||
case <-c.shutdownCh:
|
||||
return nil, fmt.Errorf("aborting because client is shutting down")
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
return resp.Node, nil
|
||||
}
|
||||
|
||||
// removeAlloc is invoked when we should remove an allocation
|
||||
func (c *Client) removeAlloc(alloc *structs.Allocation) error {
|
||||
c.allocLock.Lock()
|
||||
|
@ -1354,9 +1625,10 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error {
|
|||
}
|
||||
|
||||
// addAlloc is invoked when we should add an allocation
|
||||
func (c *Client) addAlloc(alloc *structs.Allocation) error {
|
||||
func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.AllocDir) error {
|
||||
c.configLock.RLock()
|
||||
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient)
|
||||
ar.SetPreviousAllocDir(prevAllocDir)
|
||||
c.configLock.RUnlock()
|
||||
go ar.Run()
|
||||
|
||||
|
|
|
@ -477,6 +477,7 @@ func parseEphemeralDisk(result **structs.EphemeralDisk, list *ast.ObjectList) er
|
|||
valid := []string{
|
||||
"sticky",
|
||||
"size",
|
||||
"migrate",
|
||||
}
|
||||
if err := checkHCLKeys(obj.Val, valid); err != nil {
|
||||
return err
|
||||
|
|
|
@ -2738,6 +2738,10 @@ type EphemeralDisk struct {
|
|||
|
||||
// SizeMB is the size of the local disk
|
||||
SizeMB int `mapstructure:"size"`
|
||||
|
||||
// Migrate determines if Nomad client should migrate the allocation dir for
|
||||
// sticky allocations
|
||||
Migrate bool
|
||||
}
|
||||
|
||||
// DefaultEphemeralDisk returns a EphemeralDisk with default configurations
|
||||
|
@ -2976,6 +2980,19 @@ func (a *Allocation) Stub() *AllocListStub {
|
|||
}
|
||||
}
|
||||
|
||||
// ShouldMigrate returns if the allocation needs data migration
|
||||
func (a *Allocation) ShouldMigrate() bool {
|
||||
if a.DesiredStatus == AllocDesiredStatusStop || a.DesiredStatus == AllocDesiredStatusEvict {
|
||||
return false
|
||||
}
|
||||
|
||||
if tg := a.Job.LookupTaskGroup(a.TaskGroup); tg != nil && !tg.EphemeralDisk.Migrate || !tg.EphemeralDisk.Sticky {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
var (
|
||||
// AllocationIndexRegex is a regular expression to find the allocation index.
|
||||
AllocationIndexRegex = regexp.MustCompile(".+\\[(\\d+)\\]$")
|
||||
|
|
|
@ -1156,6 +1156,62 @@ func TestTaskArtifact_Validate_Dest(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestAllocation_ShouldMigrate(t *testing.T) {
|
||||
alloc := Allocation{
|
||||
TaskGroup: "foo",
|
||||
Job: &Job{
|
||||
TaskGroups: []*TaskGroup{
|
||||
{
|
||||
Name: "foo",
|
||||
EphemeralDisk: &EphemeralDisk{
|
||||
Migrate: true,
|
||||
Sticky: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if !alloc.ShouldMigrate() {
|
||||
t.Fatalf("bad: %v", alloc)
|
||||
}
|
||||
|
||||
alloc1 := Allocation{
|
||||
TaskGroup: "foo",
|
||||
Job: &Job{
|
||||
TaskGroups: []*TaskGroup{
|
||||
{
|
||||
Name: "foo",
|
||||
EphemeralDisk: &EphemeralDisk{},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if alloc1.ShouldMigrate() {
|
||||
t.Fatalf("bad: %v", alloc)
|
||||
}
|
||||
|
||||
alloc2 := Allocation{
|
||||
TaskGroup: "foo",
|
||||
Job: &Job{
|
||||
TaskGroups: []*TaskGroup{
|
||||
{
|
||||
Name: "foo",
|
||||
EphemeralDisk: &EphemeralDisk{
|
||||
Sticky: false,
|
||||
Migrate: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if alloc2.ShouldMigrate() {
|
||||
t.Fatalf("bad: %v", alloc)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskArtifact_Validate_Checksum(t *testing.T) {
|
||||
cases := []struct {
|
||||
Input *TaskArtifact
|
||||
|
|
Loading…
Reference in New Issue