Setting the appropriate file permissions which un-archiving compressed alloc dir
This commit is contained in:
parent
bc17cacca0
commit
cbf73908ff
|
@ -1578,6 +1578,18 @@ func (c *Client) migrateRemoteAllocDir(alloc *structs.Allocation, allocID string
|
||||||
return nil, fmt.Errorf("error getting snapshot for alloc %v: %v", alloc.ID, err)
|
return nil, fmt.Errorf("error getting snapshot for alloc %v: %v", alloc.ID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := c.unarchiveAllocDir(resp, allocID, pathToAllocDir); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If there were no errors then we create the allocdir
|
||||||
|
prevAllocDir := allocdir.NewAllocDir(pathToAllocDir)
|
||||||
|
return prevAllocDir, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// unarchiveAllocDir reads the stream of a compressed allocation directory and
|
||||||
|
// writes them to the disk.
|
||||||
|
func (c *Client) unarchiveAllocDir(resp io.ReadCloser, allocID string, pathToAllocDir string) error {
|
||||||
tr := tar.NewReader(resp)
|
tr := tar.NewReader(resp)
|
||||||
defer resp.Close()
|
defer resp.Close()
|
||||||
|
|
||||||
|
@ -1586,40 +1598,38 @@ func (c *Client) migrateRemoteAllocDir(alloc *structs.Allocation, allocID string
|
||||||
stopMigrating, ok := c.migratingAllocs[allocID]
|
stopMigrating, ok := c.migratingAllocs[allocID]
|
||||||
if !ok {
|
if !ok {
|
||||||
os.RemoveAll(pathToAllocDir)
|
os.RemoveAll(pathToAllocDir)
|
||||||
return nil, fmt.Errorf("couldn't find a migration validity notifier for alloc: %v", alloc.ID)
|
return fmt.Errorf("Allocation %q is not marked for remote migration: %v", allocID)
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
// See if the alloc still needs migration
|
// See if the alloc still needs migration
|
||||||
select {
|
select {
|
||||||
case <-stopMigrating:
|
case <-stopMigrating:
|
||||||
os.RemoveAll(pathToAllocDir)
|
os.RemoveAll(pathToAllocDir)
|
||||||
c.logger.Printf("[INFO] client: stopping migration of allocdir for alloc: %v", alloc.ID)
|
c.logger.Printf("[INFO] client: stopping migration of allocdir for alloc: %v", allocID)
|
||||||
return nil, nil
|
return nil
|
||||||
case <-c.shutdownCh:
|
case <-c.shutdownCh:
|
||||||
os.RemoveAll(pathToAllocDir)
|
os.RemoveAll(pathToAllocDir)
|
||||||
c.logger.Printf("[INFO] client: stopping migration of alloc %q since client is shutting down", alloc.ID)
|
c.logger.Printf("[INFO] client: stopping migration of alloc %q since client is shutting down", allocID)
|
||||||
return nil, nil
|
return nil
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the next header
|
// Get the next header
|
||||||
hdr, err := tr.Next()
|
hdr, err := tr.Next()
|
||||||
|
|
||||||
// If the snapshot has ended then we create the previous
|
// Snapshot has ended
|
||||||
// allocdir
|
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
prevAllocDir := allocdir.NewAllocDir(pathToAllocDir)
|
return nil
|
||||||
return prevAllocDir, nil
|
|
||||||
}
|
}
|
||||||
// If there is an error then we avoid creating the alloc dir
|
// If there is an error then we avoid creating the alloc dir
|
||||||
if err != nil {
|
if err != nil {
|
||||||
os.RemoveAll(pathToAllocDir)
|
os.RemoveAll(pathToAllocDir)
|
||||||
return nil, fmt.Errorf("error creating alloc dir for alloc %q: %v", alloc.ID, err)
|
return fmt.Errorf("error creating alloc dir for alloc %q: %v", allocID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the header is for a directory we create the directory
|
// If the header is for a directory we create the directory
|
||||||
if hdr.Typeflag == tar.TypeDir {
|
if hdr.Typeflag == tar.TypeDir {
|
||||||
os.MkdirAll(filepath.Join(pathToAllocDir, hdr.Name), 0777)
|
os.MkdirAll(filepath.Join(pathToAllocDir, hdr.Name), os.FileMode(hdr.Mode))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// If the header is a file, we write to a file
|
// If the header is a file, we write to a file
|
||||||
|
@ -1630,33 +1640,47 @@ func (c *Client) migrateRemoteAllocDir(alloc *structs.Allocation, allocID string
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Setting the permissions of the file as the origin.
|
||||||
|
if err := f.Chmod(os.FileMode(hdr.Mode)); err != nil {
|
||||||
|
f.Close()
|
||||||
|
c.logger.Printf("[ERR] client: error chmod-ing file %s: %v", f.Name(), err)
|
||||||
|
return fmt.Errorf("error chmoding file %v", err)
|
||||||
|
}
|
||||||
|
if err := f.Chown(hdr.Uid, hdr.Gid); err != nil {
|
||||||
|
f.Close()
|
||||||
|
c.logger.Printf("[ERR] client: error chown-ing file %s: %v", f.Name(), err)
|
||||||
|
return fmt.Errorf("error chowning file %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// We write in chunks of 32 bytes so that we can test if
|
// We write in chunks of 32 bytes so that we can test if
|
||||||
// the client is still alive
|
// the client is still alive
|
||||||
for {
|
for {
|
||||||
if c.shutdown {
|
if c.shutdown {
|
||||||
f.Close()
|
f.Close()
|
||||||
os.RemoveAll(pathToAllocDir)
|
os.RemoveAll(pathToAllocDir)
|
||||||
c.logger.Printf("[INFO] client: stopping migration of alloc %q because client is shutting down", alloc.ID)
|
c.logger.Printf("[INFO] client: stopping migration of alloc %q because client is shutting down", allocID)
|
||||||
return nil, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err := tr.Read(buf)
|
n, err := tr.Read(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.Close()
|
f.Close()
|
||||||
if err != io.EOF {
|
if err != io.EOF {
|
||||||
return nil, fmt.Errorf("error reading snapshot: %v", err)
|
return fmt.Errorf("error reading snapshot: %v", err)
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if _, err := f.Write(buf[:n]); err != nil {
|
if _, err := f.Write(buf[:n]); err != nil {
|
||||||
f.Close()
|
f.Close()
|
||||||
os.RemoveAll(pathToAllocDir)
|
os.RemoveAll(pathToAllocDir)
|
||||||
return nil, fmt.Errorf("error writing to file %q: %v", f.Name(), err)
|
return fmt.Errorf("error writing to file %q: %v", f.Name(), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getNode gets the node from the server with the given Node ID
|
// getNode gets the node from the server with the given Node ID
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"archive/tar"
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
|
@ -804,3 +807,111 @@ func TestClient_BlockedAllocations(t *testing.T) {
|
||||||
c1.allocLock.Unlock()
|
c1.allocLock.Unlock()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClient_UnarchiveAllocDir(t *testing.T) {
|
||||||
|
dir, err := ioutil.TempDir("", "")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
|
if err := os.Mkdir(filepath.Join(dir, "foo"), 0777); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
dirInfo, err := os.Stat(filepath.Join(dir, "foo"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
f, err := os.Create(filepath.Join(dir, "foo", "bar"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if _, err := f.WriteString("foo"); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if err := f.Chmod(0644); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
fInfo, err := f.Stat()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
f.Close()
|
||||||
|
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
tw := tar.NewWriter(buf)
|
||||||
|
|
||||||
|
walkFn := func(path string, fileInfo os.FileInfo, err error) error {
|
||||||
|
// Ignore if the file is a symlink
|
||||||
|
if fileInfo.Mode() == os.ModeSymlink {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Include the path of the file name relative to the alloc dir
|
||||||
|
// so that we can put the files in the right directories
|
||||||
|
hdr, err := tar.FileInfoHeader(fileInfo, "")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error creating file header: %v", err)
|
||||||
|
}
|
||||||
|
hdr.Name = fileInfo.Name()
|
||||||
|
tw.WriteHeader(hdr)
|
||||||
|
|
||||||
|
// If it's a directory we just write the header into the tar
|
||||||
|
if fileInfo.IsDir() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write the file into the archive
|
||||||
|
file, err := os.Open(path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
if _, err := io.Copy(tw, file); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := filepath.Walk(dir, walkFn); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
tw.Close()
|
||||||
|
|
||||||
|
dir1, err := ioutil.TempDir("", "")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
|
||||||
|
c1 := testClient(t, func(c *config.Config) {
|
||||||
|
c.RPCHandler = nil
|
||||||
|
})
|
||||||
|
defer c1.Shutdown()
|
||||||
|
|
||||||
|
rc := ioutil.NopCloser(buf)
|
||||||
|
|
||||||
|
c1.migratingAllocs["123"] = make(chan struct{})
|
||||||
|
if err := c1.unarchiveAllocDir(rc, "123", dir1); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure foo is present
|
||||||
|
fi, err := os.Stat(filepath.Join(dir1, "foo"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if fi.Mode() != dirInfo.Mode() {
|
||||||
|
t.Fatalf("mode: %v", fi.Mode())
|
||||||
|
}
|
||||||
|
|
||||||
|
fi1, err := os.Stat(filepath.Join(dir1, "bar"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if fi1.Mode() != fInfo.Mode() {
|
||||||
|
t.Fatalf("mode: %v", fi1.Mode())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue