switch from alloc blocker to new interface

interface has 3 implementations:

1. local for blocking and moving data locally
2. remote for blocking and moving data from another node
3. noop for allocs that don't need to block
This commit is contained in:
Michael Schurter 2017-08-10 10:56:51 -07:00
parent ee04717a0b
commit e41a654917
5 changed files with 498 additions and 680 deletions

View File

@ -1,185 +0,0 @@
package client
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/nomad/nomad/structs"
)
// allocGetter is able to retrieve local and remote allocs.
type allocGetter interface {
// GetClientAlloc returns the alloc if an alloc ID is found locally,
// otherwise an error.
GetClientAlloc(allocID string) (*structs.Allocation, error)
// RPC allows retrieving remote allocs.
RPC(method string, args interface{}, reply interface{}) error
}
type allocBlocker struct {
// blocking is a map of allocs being watched to chans to signal their
// termination and optionally the node they were running on.
blocking map[string]chan string
blockingLock sync.Mutex
// allocs is used to retrieve local and remote allocs
allocs allocGetter
// region for making rpc calls
region string
logger *log.Logger
}
func newAllocBlocker(l *log.Logger, allocs allocGetter, region string) *allocBlocker {
return &allocBlocker{
blocking: make(map[string]chan string),
allocs: allocs,
region: region,
logger: l,
}
}
// allocTerminated marks a local allocation as terminated or GC'd.
func (a *allocBlocker) allocTerminated(allocID string) {
a.blockingLock.Lock()
defer a.blockingLock.Unlock()
if ch, ok := a.blocking[allocID]; ok {
//TODO(schmichael) REMOVE
a.logger.Printf("[TRACE] client: XXX closing and deleting terminated blocking alloc %q", allocID)
ch <- ""
delete(a.blocking, allocID)
} else {
//TODO(schmichael) REMOVE
a.logger.Printf("[TRACE] client: XXX no waiting on terminated alloc %q", allocID)
}
}
// BlockOnAlloc blocks on an alloc terminating.
func (a *allocBlocker) BlockOnAlloc(ctx context.Context, allocID string) (string, error) {
// Register an intent to block until an alloc exists to prevent races
// between checking to see if it has already exited and waiting for it
// to exit
terminatedCh, err := a.watch(allocID)
if err != nil {
return "", err
}
if alloc, err := a.allocs.GetClientAlloc(allocID); err == nil {
// Local alloc, return early if already terminated
if alloc.Terminated() {
return "", nil
}
} else {
// Remote alloc, setup blocking rpc call
go a.watchRemote(ctx, allocID)
}
select {
case node := <-terminatedCh:
a.logger.Printf("[DEBUG] client: blocking alloc %q exited", allocID)
//TODO migrate?!
return node, nil
case <-ctx.Done():
return "", ctx.Err()
}
}
// watch for an alloc to terminate. Returns an error if there's already a
// watcher as blocked allocs to blockers should be 1:1.
func (a *allocBlocker) watch(allocID string) (<-chan string, error) {
a.blockingLock.Lock()
defer a.blockingLock.Unlock()
ch, ok := a.blocking[allocID]
if ok {
return nil, fmt.Errorf("multiple blockers on alloc %q", allocID)
}
ch = make(chan string)
a.blocking[allocID] = ch
return ch
}
// watch for a non-local alloc to terminate using a blocking rpc call
func (a *allocBlocker) watchRemote(ctx context.Context, allocID string) {
req := structs.AllocSpecificRequest{
AllocID: allocID,
QueryOptions: structs.QueryOptions{
Region: a.region,
AllowStale: true,
},
}
for {
resp := structs.SingleAllocResponse{}
err := a.allocs.RPC("Alloc.GetAlloc", &req, &resp)
if err != nil {
a.logger.Printf("[ERR] client: failed to query allocation %q: %v", allocID, err)
retry := getAllocRetryIntv + lib.RandomStagger(getAllocRetryIntv)
select {
case <-time.After(retry):
continue
case <-ctx.Done():
return
}
}
if resp.Alloc == nil {
//TODO(schmichael) confirm this assumption
a.logger.Printf("[DEBUG] client: blocking alloc %q has been GC'd", allocID)
a.allocTerminated(allocID, "")
}
if resp.Alloc.Terminated() {
// Terminated!
a.allocTerminated(allocID, resp.Alloc.NodeID)
}
// Update the query index and requery.
if resp.Index > req.MinQueryIndex {
req.MinQueryIndex = resp.Index
}
}
}
// GetNodeAddr gets the node from the server with the given Node ID
func (a *allocBlocker) GetNodeAddr(ctx context.Context, nodeID string) (*structs.Node, error) {
req := structs.NodeSpecificRequest{
NodeID: nodeID,
QueryOptions: structs.QueryOptions{
Region: c.region,
AllowStale: true,
},
}
resp := structs.SingleNodeResponse{}
for {
err := c.allocs.RPC("Node.GetNode", &req, &resp)
if err != nil {
c.logger.Printf("[ERR] client: failed to query node info %q: %v", nodeID, err)
retry := getAllocRetryIntv + lib.RandomStagger(getAllocRetryIntv)
select {
case <-time.After(retry):
continue
case <-ctx.Done():
return nil, ctx.Err()
}
}
break
}
if resp.Node == nil {
return nil, fmt.Errorf("node %q not found", nodeID)
}
scheme := "http://"
if node.TLSEnabled {
scheme = "https://"
}
return scheme + node.HTTPAdrr, nil
}

View File

@ -80,10 +80,14 @@ type AllocRunner struct {
vaultClient vaultclient.VaultClient
consulClient ConsulServiceAPI
otherAllocDir *allocdir.AllocDir
prevAlloc prevAllocWatcher
// blocker blocks until a previous allocation has terminated
blocker *allocBlocker
// blocked and migrating are true when alloc runner is waiting on the
// prevAllocWatcher. Writers must acquire the waitingLock and readers
// should use the helper methods Blocked and Migrating.
blocked bool
migrating bool
waitingLock sync.RWMutex
ctx context.Context
exitFn context.CancelFunc
@ -153,7 +157,7 @@ type allocRunnerMutableState struct {
// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, updater AllocStateUpdater,
alloc *structs.Allocation, vaultClient vaultclient.VaultClient, consulClient ConsulServiceAPI,
blocker *allocBlocker, prevAllocDir *allocdir.AllocDir) *AllocRunner {
prevAlloc prevAllocWatcher) *AllocRunner {
ar := &AllocRunner{
config: config,
@ -163,8 +167,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB,
alloc: alloc,
allocID: alloc.ID,
allocBroadcast: cstructs.NewAllocBroadcaster(8),
blocker: blocker,
otherAllocDir: prevAllocDir,
prevAlloc: prevAlloc,
dirtyCh: make(chan struct{}, 1),
allocDir: allocdir.NewAllocDir(logger, filepath.Join(config.AllocDir, alloc.ID)),
tasks: make(map[string]*TaskRunner),
@ -481,6 +484,12 @@ func (r *AllocRunner) GetAllocDir() *allocdir.AllocDir {
return r.allocDir
}
// GetListener returns a listener for updates broadcast by this alloc runner.
// Callers are responsible for calling Close on their Listener.
func (r *AllocRunner) GetListener() *cstructs.AllocListener {
return r.allocBroadcast.Listen()
}
// copyTaskStates returns a copy of the passed task states.
func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.TaskState {
copy := make(map[string]*structs.TaskState, len(states))
@ -757,39 +766,36 @@ func (r *AllocRunner) Run() {
}
r.allocDirLock.Unlock()
// If there was a previous allocation block until it has terminated
if alloc.PreviousAllocation != "" {
//TODO remove me
r.logger.Printf("[TRACE] client: XXXX blocking %q on -> %q", alloc.ID, alloc.PreviousAllocation)
if nodeID, err := r.blocker.BlockOnAlloc(r.ctx, alloc.PreviousAllocation); err != nil {
if err == context.Canceled {
// Exiting
return
}
// Non-canceled errors are fatal as an invariant has been broken.
r.logger.Printf("[ERR] client: alloc %q encountered an error waiting for alloc %q to terminate: %v",
alloc.ID, alloc.PreviousAllocation, err)
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("error waiting for alloc %q to terminate: %v",
alloc.PreviousAllocation, err))
// Wait for a previous alloc - if any - to terminate
r.waitingLock.Lock()
r.blocked = true
r.waitingLock.Unlock()
if err := r.prevAlloc.Wait(r.ctx); err != nil {
if err == context.Canceled {
return
}
// Move data if there's a previous alloc dir and sticky volumes is on
if tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky {
if err := r.migrateAllocDir(r.ctx, nodeID); err != nil {
if err == context.Canceled {
// Exiting
return
}
//TODO(schmichael) task event?
r.logger.Printf("[WARN] client: alloc %q encountered an error migrating data from previous alloc %q: %v",
alloc.ID, alloc.PreviousAllocation, err)
}
}
//TODO(schmichael)
panic("todo")
}
r.waitingLock.Lock()
r.blocked = false
r.migrating = true
r.waitingLock.Unlock()
// Wait for data to be migrated from a previous alloc if applicable
if err := r.prevAlloc.Migrate(r.ctx, r.allocDir); err != nil {
if err == context.Canceled {
return
}
//TODO(schmichael)
panic("todo")
}
r.waitingLock.Lock()
r.migrating = false
r.waitingLock.Unlock()
// Check if the allocation is in a terminal status. In this case, we don't
// start any of the task runners and directly wait for the destroy signal to
// clean up the allocation.
@ -888,35 +894,6 @@ OUTER:
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.allocID)
}
// migrateAllocDir handles migrating data from a previous alloc. Only call from
// Run.
func (r *AllocRunner) migrateAllocDir(ctx context.Context, nodeID string) error {
if r.otherAllocDir != nil {
// The other alloc was local, move it over
if err := r.allocDir.Move(r.otherAllocDir, tg.Tasks); err != nil {
r.logger.Printf("[ERR] client: failed to move alloc dir into alloc %q: %v", alloc.ID, err)
}
//TODO(schmichael) task event?
if err := r.otherAllocDir.Destroy(); err != nil {
r.logger.Printf("[ERR] client: error destroying allocdir %v: %v", r.otherAllocDir.AllocDir, err)
}
return nil
}
// No previous alloc dir set, try to migrate from a remote node
if tg.EphemeralDisk.Migrate {
if nodeID == "" {
return fmt.Errorf("unable to find remote node")
}
//TODO(schmichael) add a timeout to context?
nodeAddr, err := r.blocker.GetNodeAddr(ctx, nodeID)
if err != nil {
return err
}
}
}
// destroyTaskRunners destroys the task runners, waits for them to terminate and
// then saves state.
func (r *AllocRunner) destroyTaskRunners(destroyEvent *structs.TaskEvent) {
@ -999,6 +976,24 @@ func (r *AllocRunner) handleDestroy() {
}
}
// Blocked returns true if this alloc is waiting on a previous allocation to
// terminate.
func (r *AllocRunner) Blocked() bool {
r.waitingLock.RLock()
b := r.blocked
r.waitingLock.RUnlock()
return b
}
// Migrating returns true if this alloc is migrating data from a previous
// allocation.
func (r *AllocRunner) Migrating() bool {
r.waitingLock.RLock()
m := r.migrating
r.waitingLock.RUnlock()
return m
}
// Update is used to update the allocation of the context
func (r *AllocRunner) Update(update *structs.Allocation) {
select {

401
client/alloc_watcher.go Normal file
View File

@ -0,0 +1,401 @@
package client
import (
"archive/tar"
"context"
"fmt"
"io"
"log"
"os"
"path/filepath"
"time"
"github.com/hashicorp/consul/lib"
nomadapi "github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
)
type rpcer interface {
// RPC allows retrieving remote allocs.
RPC(method string, args interface{}, reply interface{}) error
}
type prevAllocWatcher interface {
// Wait for previous alloc to terminate
Wait(context.Context) error
// Migrate data from previous alloc
Migrate(ctx context.Context, dest *allocdir.AllocDir) error
}
// newAllocWatcher creates a prevAllocWatcher appropriate for whether this
// allocs previous allocation was local or remote. If this alloc has no
// previous alloc then a noop implementation is returned.
func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger) prevAllocWatcher {
if alloc.PreviousAllocation == "" {
// No previous allocation, use noop transitioner
return noopPrevAlloc{}
}
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if prevAR != nil {
// Previous allocation is local, use local transitioner
return &localPrevAlloc{
allocID: alloc.ID,
prevAllocID: alloc.PreviousAllocation,
tasks: tg.Tasks,
sticky: tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky,
prevAllocDir: prevAR.GetAllocDir(),
prevListener: prevAR.GetListener(),
prevWaitCh: prevAR.WaitCh(),
logger: l,
}
}
return &remotePrevAlloc{
allocID: alloc.ID,
prevAllocID: alloc.PreviousAllocation,
tasks: tg.Tasks,
config: config,
migrate: tg.EphemeralDisk != nil && tg.EphemeralDisk.Migrate,
rpc: rpc,
logger: l,
}
}
// localPrevAlloc is a prevAllocWatcher for previous allocations on the same
// node as an updated allocation.
type localPrevAlloc struct {
allocID string
prevAllocID string
tasks []*structs.Task
sticky bool
prevAllocDir *allocdir.AllocDir
prevListener *cstructs.AllocListener
prevWaitCh <-chan struct{}
logger *log.Logger
}
// Wait on a local alloc to become terminal, exit, or the context to be done.
func (p *localPrevAlloc) Wait(ctx context.Context) error {
defer p.prevListener.Close()
p.logger.Printf("[DEBUG] client: alloc %q waiting for previous alloc %q to terminate", p.allocID, p.prevAllocID)
for {
select {
case prevAlloc := <-p.prevListener.Ch:
if prevAlloc.Terminated() {
return nil
}
case <-p.prevWaitCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
}
// Migrate from previous local alloc dir to destination alloc dir.
func (p *localPrevAlloc) Migrate(ctx context.Context, dest *allocdir.AllocDir) error {
if !p.sticky {
// Not a sticky volume, nothing to migrate
return nil
}
p.logger.Printf("[DEBUG] client: alloc %q copying previous alloc %q", p.allocID, p.prevAllocID)
if err := dest.Move(p.prevAllocDir, p.tasks); err != nil {
p.logger.Printf("[ERR] client: failed to move previous alloc dir %q: %v", p.prevAllocDir.AllocDir, err)
}
if err := p.prevAllocDir.Destroy(); err != nil {
p.logger.Printf("[ERR] client: error destroying allocdir %v: %v", p.prevAllocDir.AllocDir, err)
}
return nil
}
// remotePrevAlloc is a prevAllcWatcher for previous allocations on remote
// nodes as an updated allocation.
type remotePrevAlloc struct {
allocID string
prevAllocID string
tasks []*structs.Task
config *config.Config
migrate bool
rpc rpcer
// nodeID is the node the previous alloc. Set by Wait() for use in
// Migrate() iff the previous alloc has not already been GC'd.
nodeID string
logger *log.Logger
}
func (p *remotePrevAlloc) Wait(ctx context.Context) error {
p.logger.Printf("[DEBUG] client: alloc %q waiting for remote previous alloc %q to terminate", p.allocID, p.prevAllocID)
req := structs.AllocSpecificRequest{
AllocID: p.prevAllocID,
QueryOptions: structs.QueryOptions{
Region: p.config.Region,
AllowStale: true,
},
}
done := func() bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}
for !done() {
resp := structs.SingleAllocResponse{}
err := p.rpc.RPC("Alloc.GetAlloc", &req, &resp)
if err != nil {
p.logger.Printf("[ERR] client: failed to query previous alloc %q: %v", p.prevAllocID, err)
retry := getAllocRetryIntv + lib.RandomStagger(getAllocRetryIntv)
select {
case <-time.After(retry):
continue
case <-ctx.Done():
return ctx.Err()
}
}
if resp.Alloc == nil {
p.logger.Printf("[DEBUG] client: blocking alloc %q has been GC'd", p.prevAllocID)
return nil
}
if resp.Alloc.Terminated() {
// Terminated!
p.nodeID = resp.Alloc.NodeID
return nil
}
// Update the query index and requery.
if resp.Index > req.MinQueryIndex {
req.MinQueryIndex = resp.Index
}
}
if done() {
return ctx.Err()
}
return nil
}
// Migrate alloc data from a remote node if the new alloc has migration enabled
// and the old alloc hasn't been GC'd.
func (p *remotePrevAlloc) Migrate(ctx context.Context, dest *allocdir.AllocDir) error {
if !p.migrate {
// Volume wasn't configured to be migrated, return early
return nil
}
p.logger.Printf("[DEBUG] client: alloc %q copying from remote previous alloc %q", p.allocID, p.prevAllocID)
if p.nodeID == "" {
// NodeID couldn't be found; likely alloc was GC'd
p.logger.Printf("[WARN] client: alloc %q couldn't migrate data from previous alloc %q; previous alloc may have been GC'd",
p.allocID, p.prevAllocID)
return nil
}
addr, err := p.getNodeAddr(ctx, p.nodeID)
if err != nil {
return err
}
prevAllocDir, err := p.migrateAllocDir(ctx, addr)
if err != nil {
return err
}
if err := dest.Move(prevAllocDir, p.tasks); err != nil {
// cleanup on error
prevAllocDir.Destroy()
return err
}
if err := prevAllocDir.Destroy(); err != nil {
p.logger.Printf("[ERR] client: error destroying allocdir %q: %v", prevAllocDir.AllocDir, err)
}
return nil
}
// getNodeAddr gets the node from the server with the given Node ID
func (p *remotePrevAlloc) getNodeAddr(ctx context.Context, nodeID string) (string, error) {
req := structs.NodeSpecificRequest{
NodeID: nodeID,
QueryOptions: structs.QueryOptions{
Region: p.config.Region,
AllowStale: true,
},
}
resp := structs.SingleNodeResponse{}
for {
err := p.rpc.RPC("Node.GetNode", &req, &resp)
if err != nil {
p.logger.Printf("[ERR] client: failed to query node info %q: %v", nodeID, err)
retry := getAllocRetryIntv + lib.RandomStagger(getAllocRetryIntv)
select {
case <-time.After(retry):
continue
case <-ctx.Done():
return "", ctx.Err()
}
}
break
}
if resp.Node == nil {
return "", fmt.Errorf("node %q not found", nodeID)
}
scheme := "http://"
if resp.Node.TLSEnabled {
scheme = "https://"
}
return scheme + resp.Node.HTTPAddr, nil
}
// migrate a remote alloc dir to local node
func (p *remotePrevAlloc) migrateAllocDir(ctx context.Context, nodeAddr string) (*allocdir.AllocDir, error) {
// Create the previous alloc dir
prevAllocDir := allocdir.NewAllocDir(p.logger, filepath.Join(p.config.AllocDir, p.prevAllocID))
if err := prevAllocDir.Build(); err != nil {
return nil, fmt.Errorf("error building alloc dir for previous alloc %q: %v", p.prevAllocID, err)
}
// Create an API client
apiConfig := nomadapi.DefaultConfig()
apiConfig.Address = nodeAddr
apiConfig.TLSConfig = &nomadapi.TLSConfig{
CACert: p.config.TLSConfig.CAFile,
ClientCert: p.config.TLSConfig.CertFile,
ClientKey: p.config.TLSConfig.KeyFile,
}
apiClient, err := nomadapi.NewClient(apiConfig)
if err != nil {
return nil, err
}
url := fmt.Sprintf("/v1/client/allocation/%v/snapshot", p.prevAllocID)
resp, err := apiClient.Raw().Response(url, nil)
if err != nil {
prevAllocDir.Destroy()
return nil, fmt.Errorf("error getting snapshot from previous alloc %q: %v", p.prevAllocID, err)
}
if err := p.streamAllocDir(ctx, resp, prevAllocDir.AllocDir); err != nil {
prevAllocDir.Destroy()
return nil, err
}
return prevAllocDir, nil
}
// stream remote alloc to dir to a local path. Caller should cleanup dest on
// error.
func (p *remotePrevAlloc) streamAllocDir(ctx context.Context, resp io.ReadCloser, dest string) error {
p.logger.Printf("[DEBUG] client: alloc %q streaming snapshot of previous alloc %q to %q", p.allocID, p.prevAllocID, dest)
tr := tar.NewReader(resp)
defer resp.Close()
canceled := func() bool {
select {
case <-ctx.Done():
p.logger.Printf("[INFO] client: stopping migration of previous alloc %q for new alloc: %v",
p.prevAllocID, p.allocID)
return true
default:
return false
}
}
buf := make([]byte, 1024)
for !canceled() {
// Get the next header
hdr, err := tr.Next()
// Snapshot has ended
if err == io.EOF {
return nil
}
if err != nil {
return fmt.Errorf("error streaming previous alloc %p for new alloc %q: %v",
p.prevAllocID, p.allocID, err)
}
// If the header is for a directory we create the directory
if hdr.Typeflag == tar.TypeDir {
os.MkdirAll(filepath.Join(dest, hdr.Name), os.FileMode(hdr.Mode))
continue
}
// If the header is for a symlink we create the symlink
if hdr.Typeflag == tar.TypeSymlink {
if err = os.Symlink(hdr.Linkname, filepath.Join(dest, hdr.Name)); err != nil {
return fmt.Errorf("error creating symlink: %v", err)
}
continue
}
// If the header is a file, we write to a file
if hdr.Typeflag == tar.TypeReg {
f, err := os.Create(filepath.Join(dest, hdr.Name))
if err != nil {
return fmt.Errorf("error creating file: %v", err)
}
// Setting the permissions of the file as the origin.
if err := f.Chmod(os.FileMode(hdr.Mode)); err != nil {
f.Close()
return fmt.Errorf("error chmoding file %v", err)
}
if err := f.Chown(hdr.Uid, hdr.Gid); err != nil {
f.Close()
return fmt.Errorf("error chowning file %v", err)
}
// We write in chunks so that we can test if the client
// is still alive
for !canceled() {
n, err := tr.Read(buf)
if err != nil {
f.Close()
if err != io.EOF {
return fmt.Errorf("error reading snapshot: %v", err)
}
break
}
if _, err := f.Write(buf[:n]); err != nil {
f.Close()
return fmt.Errorf("error writing to file %q: %v", f.Name(), err)
}
}
}
}
if canceled() {
return ctx.Err()
}
return nil
}
// noopPrevAlloc does not block or migrate on a previous allocation and never
// returns an error.
type noopPrevAlloc struct{}
// Wait returns nil immediately.
func (noopPrevAlloc) Wait(context.Context) error { return nil }
// Migrate returns nil immediately.
func (noopPrevAlloc) Migrate(context.Context, *allocdir.AllocDir) error { return nil }

View File

@ -65,6 +65,9 @@ type AllocDir struct {
// TaskDirs is a mapping of task names to their non-shared directory.
TaskDirs map[string]*TaskDir
// built is true if Build has successfully run
built bool
logger *log.Logger
}
@ -188,6 +191,11 @@ func (d *AllocDir) Snapshot(w io.Writer) error {
// Move other alloc directory's shared path and local dir to this alloc dir.
func (d *AllocDir) Move(other *AllocDir, tasks []*structs.Task) error {
if !d.built {
// Enfornce the invariant that Build is called before Move
return fmt.Errorf("unable to move to %q - alloc dir is not built", d.AllocDir)
}
// Move the data directory
otherDataDir := filepath.Join(other.SharedDir, SharedDataDir)
dataDir := filepath.Join(d.SharedDir, SharedDataDir)
@ -296,6 +304,8 @@ func (d *AllocDir) Build() error {
}
}
// Mark as built
d.built = true
return nil
}

View File

@ -1,10 +1,8 @@
package client
import (
"archive/tar"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net"
@ -20,7 +18,6 @@ import (
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-multierror"
nomadapi "github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver"
@ -130,18 +127,6 @@ type Client struct {
allocs map[string]*AllocRunner
allocLock sync.RWMutex
// allocBlocker is used to block until local or remote allocations terminate
allocBlocker *allocBlocker
// blockedAllocations are allocations which are blocked because their
// chained allocations haven't finished running
blockedAllocations map[string]*structs.Allocation
blockedAllocsLock sync.RWMutex
// migratingAllocs is the set of allocs whose data migration is in flight
migratingAllocs map[string]*migrateAllocCtrl
migratingAllocsLock sync.RWMutex
// allocUpdates stores allocations that need to be synced to the server.
allocUpdates chan *structs.Allocation
@ -167,34 +152,6 @@ type Client struct {
garbageCollector *AllocGarbageCollector
}
// migrateAllocCtrl indicates whether migration is complete
type migrateAllocCtrl struct {
alloc *structs.Allocation
ch chan struct{}
closed bool
chLock sync.Mutex
}
func newMigrateAllocCtrl(alloc *structs.Allocation) *migrateAllocCtrl {
return &migrateAllocCtrl{
ch: make(chan struct{}),
alloc: alloc,
}
}
func (m *migrateAllocCtrl) closeCh() {
m.chLock.Lock()
defer m.chLock.Unlock()
if m.closed {
return
}
// If channel is not closed then close it
m.closed = true
close(m.ch)
}
var (
// noServersErr is returned by the RPC method when the client has no
// configured servers. This is used to trigger Consul discovery if
@ -223,10 +180,8 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
logger: logger,
allocs: make(map[string]*AllocRunner),
blockedAllocations: make(map[string]*structs.Allocation),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
migratingAllocs: make(map[string]*migrateAllocCtrl),
servers: newServerList(),
triggerDiscoveryCh: make(chan struct{}),
serversDiscoveredCh: make(chan struct{}),
@ -300,10 +255,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
return nil, fmt.Errorf("failed to setup vault client: %v", err)
}
// Create an alloc blocker for tracking alloc terminations; must exist
// before NewAllocRunner is called by methods like restoreState and run
c.allocBlocker = newAllocBlocker(c.logger, c, c.config.Region)
// Restore the state
if err := c.restoreState(); err != nil {
logger.Printf("[ERR] client: failed to restore state: %v", err)
@ -664,8 +615,11 @@ func (c *Client) restoreState() error {
for _, id := range allocs {
alloc := &structs.Allocation{ID: id}
// don't worry about blocking/migrating when restoring
watcher := noopPrevAlloc{}
c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, c.allocBlocker)
ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, watcher)
c.configLock.RUnlock()
c.allocLock.Lock()
@ -742,15 +696,6 @@ func (c *Client) NumAllocs() int {
c.allocLock.RLock()
n := len(c.allocs)
c.allocLock.RUnlock()
c.blockedAllocsLock.RLock()
n += len(c.blockedAllocations)
c.blockedAllocsLock.RUnlock()
c.migratingAllocsLock.RLock()
n += len(c.migratingAllocs)
c.migratingAllocsLock.RUnlock()
return n
}
@ -1258,22 +1203,9 @@ func (c *Client) updateNodeStatus() error {
// updateAllocStatus is used to update the status of an allocation
func (c *Client) updateAllocStatus(alloc *structs.Allocation) {
// If this alloc was blocking another alloc and transitioned to a
// terminal state then start the blocked allocation
if alloc.Terminated() {
// Mark the allocation for GC if it is in terminal state and won't be migrated
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil || tg.EphemeralDisk == nil || !tg.EphemeralDisk.Sticky {
if ar, ok := c.getAllocRunners()[alloc.ID]; ok {
c.garbageCollector.MarkForCollection(ar)
}
}
}
// If this allocation isn't pending and had a previous allocation, the
// previous allocation is now free to be GC'd.
if alloc.ClientStatus != structs.AllocClientStatusPending && alloc.PreviousAllocation != "" {
if ar, ok := c.getAllocRunners()[alloc.PreviousAllocation]; ok {
// Terminated, mark for GC
if ar, ok := c.getAllocRunners()[alloc.ID]; ok {
c.garbageCollector.MarkForCollection(ar)
}
}
@ -1576,340 +1508,17 @@ func (c *Client) runAllocs(update *allocUpdates) {
c.logger.Printf("[ERR] client: failed to update alloc %q: %v",
update.exist.ID, err)
}
// See if the updated alloc is getting migrated
c.migratingAllocsLock.RLock()
ch, ok := c.migratingAllocs[update.updated.ID]
c.migratingAllocsLock.RUnlock()
if ok {
// Stopping the migration if the allocation doesn't need any
// migration
if !update.updated.ShouldMigrate() {
ch.closeCh()
}
}
}
// Start the new allocations
for _, add := range diff.added {
// If the allocation is chained and the previous allocation hasn't
// terminated yet, then add the alloc to the blocked queue.
c.blockedAllocsLock.Lock()
ar, ok := c.getAllocRunners()[add.PreviousAllocation]
if ok && !ar.Alloc().Terminated() {
// Check if the alloc is already present in the blocked allocations
// map
if _, ok := c.blockedAllocations[add.PreviousAllocation]; !ok {
c.logger.Printf("[DEBUG] client: added alloc %q to blocked queue for previous alloc %q",
add.ID, add.PreviousAllocation)
c.blockedAllocations[add.PreviousAllocation] = add
}
c.blockedAllocsLock.Unlock()
continue
}
c.blockedAllocsLock.Unlock()
// This means the allocation has a previous allocation on another node
// so we will block for the previous allocation to complete
if add.PreviousAllocation != "" && !ok {
// Ensure that we are not blocking for the remote allocation if we
// have already blocked
c.migratingAllocsLock.Lock()
if _, ok := c.migratingAllocs[add.ID]; !ok {
// Check that we don't have an alloc runner already. This
// prevents a race between a finishing blockForRemoteAlloc and
// another invocation of runAllocs
if _, ok := c.getAllocRunners()[add.PreviousAllocation]; !ok {
c.migratingAllocs[add.ID] = newMigrateAllocCtrl(add)
go c.blockForRemoteAlloc(add)
}
}
c.migratingAllocsLock.Unlock()
continue
}
// Setting the previous allocdir if the allocation had a terminal
// previous allocation
var prevAllocDir *allocdir.AllocDir
tg := add.Job.LookupTaskGroup(add.TaskGroup)
if tg != nil && tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky && ar != nil {
prevAllocDir = ar.GetAllocDir()
}
if err := c.addAlloc(add, prevAllocDir); err != nil {
if err := c.addAlloc(add); err != nil {
c.logger.Printf("[ERR] client: failed to add alloc '%s': %v",
add.ID, err)
}
}
}
// blockForRemoteAlloc blocks until the previous allocation of an allocation has
// been terminated and migrates the snapshot data
func (c *Client) blockForRemoteAlloc(alloc *structs.Allocation) {
// Removing the allocation from the set of allocs which are currently
// undergoing migration
defer func() {
c.migratingAllocsLock.Lock()
delete(c.migratingAllocs, alloc.ID)
c.migratingAllocsLock.Unlock()
}()
// prevAllocDir is the allocation directory of the previous allocation
var prevAllocDir *allocdir.AllocDir
// If the allocation is not sticky then we won't wait for the previous
// allocation to be terminal
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
c.logger.Printf("[ERR] client: task group %q not found in job %q", tg.Name, alloc.Job.ID)
goto ADDALLOC
}
// Wait for the remote previous alloc to be terminal if the alloc is sticky
if tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky && tg.EphemeralDisk.Migrate {
c.logger.Printf("[DEBUG] client: blocking alloc %q for previous allocation %q", alloc.ID, alloc.PreviousAllocation)
// Block until the previous allocation migrates to terminal state
stopCh := c.migratingAllocs[alloc.ID]
prevAlloc, err := c.waitForAllocTerminal(alloc.PreviousAllocation, stopCh)
if err != nil {
c.logger.Printf("[ERR] client: error waiting for allocation %q: %v",
alloc.PreviousAllocation, err)
}
// Migrate the data from the remote node
prevAllocDir, err = c.migrateRemoteAllocDir(prevAlloc, alloc.ID)
if err != nil {
c.logger.Printf("[ERR] client: error migrating data from remote alloc %q: %v",
alloc.PreviousAllocation, err)
}
}
ADDALLOC:
// Add the allocation
if err := c.addAlloc(alloc, prevAllocDir); err != nil {
c.logger.Printf("[ERR] client: error adding alloc: %v", err)
}
}
// waitForAllocTerminal waits for an allocation with the given alloc id to
// transition to terminal state and blocks the caller until then.
func (c *Client) waitForAllocTerminal(allocID string, stopCh *migrateAllocCtrl) (*structs.Allocation, error) {
req := structs.AllocSpecificRequest{
AllocID: allocID,
QueryOptions: structs.QueryOptions{
Region: c.Region(),
AllowStale: true,
},
}
for {
resp := structs.SingleAllocResponse{}
err := c.RPC("Alloc.GetAlloc", &req, &resp)
if err != nil {
c.logger.Printf("[ERR] client: failed to query allocation %q: %v", allocID, err)
retry := c.retryIntv(getAllocRetryIntv)
select {
case <-time.After(retry):
continue
case <-stopCh.ch:
return nil, fmt.Errorf("giving up waiting on alloc %q since migration is not needed", allocID)
case <-c.shutdownCh:
return nil, fmt.Errorf("aborting because client is shutting down")
}
}
if resp.Alloc == nil {
return nil, nil
}
if resp.Alloc.Terminated() {
return resp.Alloc, nil
}
// Update the query index.
if resp.Index > req.MinQueryIndex {
req.MinQueryIndex = resp.Index
}
}
}
// migrateRemoteAllocDir migrates the allocation directory from a remote node to
// the current node
func (c *Client) migrateRemoteAllocDir(alloc *structs.Allocation, allocID string) (*allocdir.AllocDir, error) {
if alloc == nil {
return nil, nil
}
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
return nil, fmt.Errorf("Task Group %q not found in job %q", tg.Name, alloc.Job.ID)
}
// Skip migration of data if the ephemeral disk is not sticky or
// migration is turned off.
if tg.EphemeralDisk == nil || !tg.EphemeralDisk.Sticky || !tg.EphemeralDisk.Migrate {
return nil, nil
}
node, err := c.getNode(alloc.NodeID)
// If the node is down then skip migrating the data
if err != nil {
return nil, fmt.Errorf("error retreiving node %v: %v", alloc.NodeID, err)
}
// Check if node is nil
if node == nil {
return nil, fmt.Errorf("node %q doesn't exist", alloc.NodeID)
}
// skip migration if the remote node is down
if node.Status == structs.NodeStatusDown {
c.logger.Printf("[INFO] client: not migrating data from alloc %q since node %q is down", alloc.ID, alloc.NodeID)
return nil, nil
}
// Create the previous alloc dir
pathToAllocDir := filepath.Join(c.config.AllocDir, alloc.ID)
if err := os.MkdirAll(pathToAllocDir, 0777); err != nil {
c.logger.Printf("[ERR] client: error creating previous allocation dir: %v", err)
}
// Get the snapshot
scheme := "http"
if node.TLSEnabled {
scheme = "https"
}
// Create an API client
apiConfig := nomadapi.DefaultConfig()
apiConfig.Address = fmt.Sprintf("%s://%s", scheme, node.HTTPAddr)
apiConfig.TLSConfig = &nomadapi.TLSConfig{
CACert: c.config.TLSConfig.CAFile,
ClientCert: c.config.TLSConfig.CertFile,
ClientKey: c.config.TLSConfig.KeyFile,
}
apiClient, err := nomadapi.NewClient(apiConfig)
if err != nil {
return nil, err
}
url := fmt.Sprintf("/v1/client/allocation/%v/snapshot", alloc.ID)
resp, err := apiClient.Raw().Response(url, nil)
if err != nil {
os.RemoveAll(pathToAllocDir)
c.logger.Printf("[ERR] client: error getting snapshot for alloc %q: %v", alloc.ID, err)
return nil, fmt.Errorf("error getting snapshot for alloc %q: %v", alloc.ID, err)
}
if err := c.unarchiveAllocDir(resp, allocID, pathToAllocDir); err != nil {
return nil, err
}
// If there were no errors then we create the allocdir
prevAllocDir := allocdir.NewAllocDir(c.logger, pathToAllocDir)
return prevAllocDir, nil
}
// unarchiveAllocDir reads the stream of a compressed allocation directory and
// writes them to the disk.
func (c *Client) unarchiveAllocDir(resp io.ReadCloser, allocID string, pathToAllocDir string) error {
tr := tar.NewReader(resp)
defer resp.Close()
buf := make([]byte, 1024)
stopMigrating, ok := c.migratingAllocs[allocID]
if !ok {
os.RemoveAll(pathToAllocDir)
return fmt.Errorf("Allocation %q is not marked for remote migration", allocID)
}
for {
// See if the alloc still needs migration
select {
case <-stopMigrating.ch:
os.RemoveAll(pathToAllocDir)
c.logger.Printf("[INFO] client: stopping migration of allocdir for alloc: %v", allocID)
return nil
case <-c.shutdownCh:
os.RemoveAll(pathToAllocDir)
c.logger.Printf("[INFO] client: stopping migration of alloc %q since client is shutting down", allocID)
return nil
default:
}
// Get the next header
hdr, err := tr.Next()
// Snapshot has ended
if err == io.EOF {
return nil
}
// If there is an error then we avoid creating the alloc dir
if err != nil {
os.RemoveAll(pathToAllocDir)
return fmt.Errorf("error creating alloc dir for alloc %q: %v", allocID, err)
}
// If the header is for a directory we create the directory
if hdr.Typeflag == tar.TypeDir {
os.MkdirAll(filepath.Join(pathToAllocDir, hdr.Name), os.FileMode(hdr.Mode))
continue
}
// If the header is for a symlink we create the symlink
if hdr.Typeflag == tar.TypeSymlink {
if err = os.Symlink(hdr.Linkname, filepath.Join(pathToAllocDir, hdr.Name)); err != nil {
c.logger.Printf("[ERR] client: error creating symlink: %v", err)
}
continue
}
// If the header is a file, we write to a file
if hdr.Typeflag == tar.TypeReg {
f, err := os.Create(filepath.Join(pathToAllocDir, hdr.Name))
if err != nil {
c.logger.Printf("[ERR] client: error creating file: %v", err)
continue
}
// Setting the permissions of the file as the origin.
if err := f.Chmod(os.FileMode(hdr.Mode)); err != nil {
f.Close()
c.logger.Printf("[ERR] client: error chmod-ing file %s: %v", f.Name(), err)
return fmt.Errorf("error chmoding file %v", err)
}
if err := f.Chown(hdr.Uid, hdr.Gid); err != nil {
f.Close()
c.logger.Printf("[ERR] client: error chown-ing file %s: %v", f.Name(), err)
return fmt.Errorf("error chowning file %v", err)
}
// We write in chunks of 32 bytes so that we can test if
// the client is still alive
for {
if c.shutdown {
f.Close()
os.RemoveAll(pathToAllocDir)
c.logger.Printf("[INFO] client: stopping migration of alloc %q because client is shutting down", allocID)
return nil
}
n, err := tr.Read(buf)
if err != nil {
f.Close()
if err != io.EOF {
return fmt.Errorf("error reading snapshot: %v", err)
}
break
}
if _, err := f.Write(buf[:n]); err != nil {
f.Close()
os.RemoveAll(pathToAllocDir)
return fmt.Errorf("error writing to file %q: %v", f.Name(), err)
}
}
}
}
}
// removeAlloc is invoked when we should remove an allocation
func (c *Client) removeAlloc(alloc *structs.Allocation) error {
c.allocLock.Lock()
@ -1954,18 +1563,17 @@ func (c *Client) addAlloc(alloc *structs.Allocation) error {
return nil
}
// If the previous allocation is local, pass in its allocation dir
var prevAllocDir *allocdir.AllocDir
tg := alloc.Job.LookupTaskGroup(add.TaskGroup)
if tg != nil && tg.EphemeralDisk != nil && tg.EphemeralDisk.Sticky && ar != nil {
if prevAR, ok := c.allocs[alloc.PreviousAllocation]; ok {
prevAllocDir = prevAR.GetAllocDir()
}
// get the previous alloc runner - if one exists - for the
// blocking/migrating watcher
var prevAR *AllocRunner
if alloc.PreviousAllocation != "" {
prevAR = c.allocs[alloc.PreviousAllocation]
}
c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, c.allocBlocker, prevAllocDir)
ar.SetPreviousAllocDir(prevAllocDir)
prevAlloc := newAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger)
ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc)
c.configLock.RUnlock()
// Store the alloc runner.
@ -1991,8 +1599,8 @@ func (c *Client) addAlloc(alloc *structs.Allocation) error {
// with vault.
func (c *Client) setupVaultClient() error {
var err error
if c.vaultClient, err =
vaultclient.NewVaultClient(c.config.VaultConfig, c.logger, c.deriveToken); err != nil {
c.vaultClient, err = vaultclient.NewVaultClient(c.config.VaultConfig, c.logger, c.deriveToken)
if err != nil {
return err
}
@ -2325,19 +1933,18 @@ func (c *Client) emitClientMetrics() {
nodeID := c.Node().ID
// Emit allocation metrics
c.blockedAllocsLock.RLock()
blocked := len(c.blockedAllocations)
c.blockedAllocsLock.RUnlock()
c.migratingAllocsLock.RLock()
migrating := len(c.migratingAllocs)
c.migratingAllocsLock.RUnlock()
pending, running, terminal := 0, 0, 0
blocked, migrating, pending, running, terminal := 0, 0, 0, 0, 0
for _, ar := range c.getAllocRunners() {
switch ar.Alloc().ClientStatus {
case structs.AllocClientStatusPending:
pending++
switch {
case ar.Blocked():
blocked++
case ar.Migrating():
migrating++
default:
pending++
}
case structs.AllocClientStatusRunning:
running++
case structs.AllocClientStatusComplete, structs.AllocClientStatusFailed:
@ -2398,22 +2005,12 @@ func (c *Client) getAllocatedResources(selfNode *structs.Node) *structs.Resource
// allAllocs returns all the allocations managed by the client
func (c *Client) allAllocs() map[string]*structs.Allocation {
allocs := make(map[string]*structs.Allocation, 16)
ars := c.getAllocRunners()
allocs := make(map[string]*structs.Allocation, len(ars))
for _, ar := range c.getAllocRunners() {
a := ar.Alloc()
allocs[a.ID] = a
}
c.blockedAllocsLock.RLock()
for _, alloc := range c.blockedAllocations {
allocs[alloc.ID] = alloc
}
c.blockedAllocsLock.RUnlock()
c.migratingAllocsLock.RLock()
for _, ctrl := range c.migratingAllocs {
allocs[ctrl.alloc.ID] = ctrl.alloc
}
c.migratingAllocsLock.RUnlock()
return allocs
}