open-nomad/client/allocdir/alloc_dir.go

400 lines
11 KiB
Go
Raw Normal View History

package allocdir
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
2016-01-27 22:20:10 +00:00
"time"
2016-07-06 00:08:58 +00:00
"gopkg.in/tomb.v1"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
2016-07-06 00:08:58 +00:00
"github.com/hpcloud/tail/watch"
)
var (
// The name of the directory that is shared across tasks in a task group.
SharedAllocName = "alloc"
2016-02-25 04:06:43 +00:00
// Name of the directory where logs of Tasks are written
LogDirName = "logs"
// The set of directories that exist inside eache shared alloc directory.
2016-02-25 04:06:43 +00:00
SharedAllocDirs = []string{LogDirName, "tmp", "data"}
// The name of the directory that exists inside each task directory
// regardless of driver.
TaskLocal = "local"
2016-02-04 23:35:04 +00:00
// TaskDirs is the set of directories created in each tasks directory.
TaskDirs = []string{"tmp"}
)
type AllocDir struct {
// AllocDir is the directory used for storing any state
// of this allocation. It will be purged on alloc destroy.
AllocDir string
// The shared directory is available to all tasks within the same task
// group.
SharedDir string
// TaskDirs is a mapping of task names to their non-shared directory.
TaskDirs map[string]string
}
2016-01-14 21:47:46 +00:00
// AllocFileInfo holds information about a file inside the AllocDir
2016-01-14 01:18:10 +00:00
type AllocFileInfo struct {
2016-01-27 22:20:10 +00:00
Name string
IsDir bool
Size int64
FileMode string
ModTime time.Time
}
// AllocDirFS exposes file operations on the alloc dir
2016-01-14 21:35:42 +00:00
type AllocDirFS interface {
List(path string) ([]*AllocFileInfo, error)
Stat(path string) (*AllocFileInfo, error)
2016-07-06 00:08:58 +00:00
ReadAt(path string, offset int64) (io.ReadCloser, error)
2016-07-18 16:48:29 +00:00
BlockUntilExists(path string, t *tomb.Tomb) chan error
2016-07-06 00:08:58 +00:00
ChangeEvents(path string, curOffset int64, t *tomb.Tomb) (*watch.FileChanges, error)
2016-01-14 21:35:42 +00:00
}
func NewAllocDir(allocDir string) *AllocDir {
d := &AllocDir{AllocDir: allocDir, TaskDirs: make(map[string]string)}
d.SharedDir = filepath.Join(d.AllocDir, SharedAllocName)
return d
}
// Tears down previously build directory structure.
func (d *AllocDir) Destroy() error {
2015-09-25 23:49:14 +00:00
// Unmount all mounted shared alloc dirs.
var mErr multierror.Error
if err := d.UnmountAll(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
if err := os.RemoveAll(d.AllocDir); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
return mErr.ErrorOrNil()
}
func (d *AllocDir) UnmountAll() error {
var mErr multierror.Error
for _, dir := range d.TaskDirs {
// Check if the directory has the shared alloc mounted.
taskAlloc := filepath.Join(dir, SharedAllocName)
if d.pathExists(taskAlloc) {
if err := d.unmountSharedDir(taskAlloc); err != nil {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("failed to unmount shared alloc dir %q: %v", taskAlloc, err))
} else if err := os.RemoveAll(taskAlloc); err != nil {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("failed to delete shared alloc dir %q: %v", taskAlloc, err))
}
2015-09-25 23:49:14 +00:00
}
// Unmount dev/ and proc/ have been mounted.
d.unmountSpecialDirs(dir)
2015-09-25 23:49:14 +00:00
}
return mErr.ErrorOrNil()
}
// Given a list of a task build the correct alloc structure.
func (d *AllocDir) Build(tasks []*structs.Task) error {
// Make the alloc directory, owned by the nomad process.
if err := os.MkdirAll(d.AllocDir, 0755); err != nil {
return fmt.Errorf("Failed to make the alloc directory %v: %v", d.AllocDir, err)
}
2016-05-15 16:41:34 +00:00
// Make the shared directory and make it available to all user/groups.
2016-03-28 21:33:53 +00:00
if err := os.MkdirAll(d.SharedDir, 0777); err != nil {
return err
}
2015-09-25 23:49:14 +00:00
// Make the shared directory have non-root permissions.
if err := d.dropDirPermissions(d.SharedDir); err != nil {
return err
}
for _, dir := range SharedAllocDirs {
p := filepath.Join(d.SharedDir, dir)
2016-03-28 21:33:53 +00:00
if err := os.MkdirAll(p, 0777); err != nil {
return err
}
if err := d.dropDirPermissions(p); err != nil {
return err
}
}
// Make the task directories.
for _, t := range tasks {
taskDir := filepath.Join(d.AllocDir, t.Name)
2016-03-28 21:33:53 +00:00
if err := os.MkdirAll(taskDir, 0777); err != nil {
return err
}
2015-09-25 23:49:14 +00:00
// Make the task directory have non-root permissions.
if err := d.dropDirPermissions(taskDir); err != nil {
return err
}
// Create a local directory that each task can use.
local := filepath.Join(taskDir, TaskLocal)
2016-03-28 21:33:53 +00:00
if err := os.MkdirAll(local, 0777); err != nil {
return err
}
2015-09-25 23:49:14 +00:00
if err := d.dropDirPermissions(local); err != nil {
return err
}
d.TaskDirs[t.Name] = taskDir
2016-02-04 23:35:04 +00:00
// Create the directories that should be in every task.
for _, dir := range TaskDirs {
local := filepath.Join(taskDir, dir)
2016-03-28 21:33:53 +00:00
if err := os.MkdirAll(local, 0777); err != nil {
2016-02-04 23:35:04 +00:00
return err
}
if err := d.dropDirPermissions(local); err != nil {
return err
}
}
}
return nil
}
// Embed takes a mapping of absolute directory or file paths on the host to
// their intended, relative location within the task directory. Embed attempts
2015-09-25 23:49:14 +00:00
// hardlink and then defaults to copying. If the path exists on the host and
2016-05-15 16:41:34 +00:00
// can't be embedded an error is returned.
func (d *AllocDir) Embed(task string, entries map[string]string) error {
taskdir, ok := d.TaskDirs[task]
if !ok {
return fmt.Errorf("Task directory doesn't exist for task %v", task)
}
subdirs := make(map[string]string)
for source, dest := range entries {
2015-09-25 23:49:14 +00:00
// Check to see if directory exists on host.
s, err := os.Stat(source)
if os.IsNotExist(err) {
2015-09-25 23:49:14 +00:00
continue
}
// Embedding a single file
if !s.IsDir() {
destDir := filepath.Join(taskdir, filepath.Dir(dest))
if err := os.MkdirAll(destDir, s.Mode().Perm()); err != nil {
return fmt.Errorf("Couldn't create destination directory %v: %v", destDir, err)
}
// Copy the file.
taskEntry := filepath.Join(destDir, filepath.Base(dest))
if err := d.linkOrCopy(source, taskEntry, s.Mode().Perm()); err != nil {
return err
}
continue
}
// Create destination directory.
destDir := filepath.Join(taskdir, dest)
if err := os.MkdirAll(destDir, s.Mode().Perm()); err != nil {
return fmt.Errorf("Couldn't create destination directory %v: %v", destDir, err)
}
// Enumerate the files in source.
dirEntries, err := ioutil.ReadDir(source)
if err != nil {
return fmt.Errorf("Couldn't read directory %v: %v", source, err)
}
for _, entry := range dirEntries {
hostEntry := filepath.Join(source, entry.Name())
2015-09-25 23:49:14 +00:00
taskEntry := filepath.Join(destDir, filepath.Base(hostEntry))
if entry.IsDir() {
subdirs[hostEntry] = filepath.Join(dest, filepath.Base(hostEntry))
continue
}
// Check if entry exists. This can happen if restarting a failed
// task.
if _, err := os.Lstat(taskEntry); err == nil {
continue
}
if !entry.Mode().IsRegular() {
// If it is a symlink we can create it, otherwise we skip it.
2015-09-25 23:49:14 +00:00
if entry.Mode()&os.ModeSymlink == 0 {
continue
2015-09-25 23:49:14 +00:00
}
link, err := os.Readlink(hostEntry)
if err != nil {
return fmt.Errorf("Couldn't resolve symlink for %v: %v", source, err)
}
if err := os.Symlink(link, taskEntry); err != nil {
// Symlinking twice
if err.(*os.LinkError).Err.Error() != "file exists" {
return fmt.Errorf("Couldn't create symlink: %v", err)
}
2015-09-25 23:49:14 +00:00
}
continue
}
if err := d.linkOrCopy(hostEntry, taskEntry, entry.Mode().Perm()); err != nil {
return err
}
}
}
// Recurse on self to copy subdirectories.
if len(subdirs) != 0 {
return d.Embed(task, subdirs)
}
return nil
}
// MountSharedDir mounts the shared directory into the specified task's
// directory. Mount is documented at an OS level in their respective
// implementation files.
func (d *AllocDir) MountSharedDir(task string) error {
taskDir, ok := d.TaskDirs[task]
if !ok {
return fmt.Errorf("No task directory exists for %v", task)
}
2015-09-25 23:49:14 +00:00
taskLoc := filepath.Join(taskDir, SharedAllocName)
if err := d.mountSharedDir(taskLoc); err != nil {
return fmt.Errorf("Failed to mount shared directory for task %v: %v", task, err)
}
return nil
}
2016-02-25 04:06:43 +00:00
// LogDir returns the log dir in the current allocation directory
func (d *AllocDir) LogDir() string {
return filepath.Join(d.AllocDir, SharedAllocName, LogDirName)
}
// List returns the list of files at a path relative to the alloc dir
2016-01-14 01:18:10 +00:00
func (d *AllocDir) List(path string) ([]*AllocFileInfo, error) {
p := filepath.Join(d.AllocDir, path)
finfos, err := ioutil.ReadDir(p)
if err != nil {
2016-01-14 19:47:05 +00:00
return []*AllocFileInfo{}, err
}
2016-01-14 01:18:10 +00:00
files := make([]*AllocFileInfo, len(finfos))
for idx, info := range finfos {
2016-01-14 01:18:10 +00:00
files[idx] = &AllocFileInfo{
2016-01-27 22:20:10 +00:00
Name: info.Name(),
IsDir: info.IsDir(),
Size: info.Size(),
FileMode: info.Mode().String(),
ModTime: info.ModTime(),
}
}
return files, err
}
// Stat returns information about the file at a path relative to the alloc dir
2016-01-14 01:18:10 +00:00
func (d *AllocDir) Stat(path string) (*AllocFileInfo, error) {
2016-01-13 05:28:07 +00:00
p := filepath.Join(d.AllocDir, path)
info, err := os.Stat(p)
2016-01-12 23:25:51 +00:00
if err != nil {
return nil, err
}
2016-01-14 01:18:10 +00:00
return &AllocFileInfo{
2016-01-27 22:20:10 +00:00
Size: info.Size(),
Name: info.Name(),
IsDir: info.IsDir(),
FileMode: info.Mode().String(),
ModTime: info.ModTime(),
2016-01-12 23:25:51 +00:00
}, nil
2016-01-13 05:28:07 +00:00
}
2016-01-12 23:25:51 +00:00
2016-07-06 00:08:58 +00:00
// ReadAt returns a reader for a file at the path relative to the alloc dir
func (d *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) {
2016-01-13 05:28:07 +00:00
p := filepath.Join(d.AllocDir, path)
f, err := os.Open(p)
if err != nil {
2016-01-14 21:35:42 +00:00
return nil, err
2016-01-13 06:06:42 +00:00
}
if _, err := f.Seek(offset, 0); err != nil {
return nil, fmt.Errorf("can't seek to offset %q: %v", offset, err)
}
2016-07-06 00:08:58 +00:00
return f, nil
}
2016-07-10 22:56:13 +00:00
// BlockUntilExists blocks until the passed file relative the allocation
// directory exists. The block can be cancelled with the passed tomb.
2016-07-18 16:48:29 +00:00
func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) chan error {
2016-07-06 00:08:58 +00:00
// Get the path relative to the alloc directory
p := filepath.Join(d.AllocDir, path)
2016-07-10 22:56:13 +00:00
watcher := getFileWatcher(p)
2016-07-18 16:48:29 +00:00
returnCh := make(chan error, 1)
go func() {
returnCh <- watcher.BlockUntilExists(t)
close(returnCh)
}()
return returnCh
2016-07-06 00:08:58 +00:00
}
2016-07-10 22:56:13 +00:00
// ChangeEvents watches for changes to the passed path relative to the
// allocation directory. The offset should be the last read offset. The tomb is
// used to clean up the watch.
2016-07-06 00:08:58 +00:00
func (d *AllocDir) ChangeEvents(path string, curOffset int64, t *tomb.Tomb) (*watch.FileChanges, error) {
// Get the path relative to the alloc directory
p := filepath.Join(d.AllocDir, path)
2016-07-10 22:56:13 +00:00
watcher := getFileWatcher(p)
return watcher.ChangeEvents(t, curOffset)
}
2016-07-06 00:08:58 +00:00
2016-07-10 22:56:13 +00:00
// getFileWatcher returns a FileWatcher for the given path.
func getFileWatcher(path string) watch.FileWatcher {
2016-08-12 01:59:48 +00:00
return watch.NewPollingFileWatcher(path)
2016-07-06 00:08:58 +00:00
}
func fileCopy(src, dst string, perm os.FileMode) error {
// Do a simple copy.
srcFile, err := os.Open(src)
if err != nil {
return fmt.Errorf("Couldn't open src file %v: %v", src, err)
}
dstFile, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE, perm)
if err != nil {
return fmt.Errorf("Couldn't create destination file %v: %v", dst, err)
}
if _, err := io.Copy(dstFile, srcFile); err != nil {
return fmt.Errorf("Couldn't copy %v to %v: %v", src, dst, err)
}
return nil
}
// pathExists is a helper function to check if the path exists.
func (d *AllocDir) pathExists(path string) bool {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}