open-nomad/e2e/csi/csi.go
Tim Gross cd1c6173f4 csi: e2e tests for EBS and EFS plugins (#7343)
This changeset provides two basic e2e tests for CSI plugins targeting
common AWS use cases.

The EBS test launches the EBS plugin (controller + nodes) and registers
an EBS volume as a Nomad CSI volume. We deploy a job that writes to
the volume, stop that job, and reuse the volume for another job which
should be able to read the data written by the first job.

The EFS test launches the EFS plugin (nodes-only) and registers an EFS
volume as a Nomad CSI volume. We deploy a job that writes to the
volume, stop that job, and reuse the volume for another job which
should be able to read the data written by the first job.

The writer jobs mount the CSI volume at a location within the alloc
dir.
2020-03-23 13:59:18 -04:00

252 lines
8.1 KiB
Go

package csi
import (
"bytes"
"context"
"encoding/json"
"io/ioutil"
"os"
"time"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/e2e/framework"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/stretchr/testify/require"
)
type CSIVolumesTest struct {
framework.TC
jobIds []string
volumeIDs *volumeConfig
}
func init() {
framework.AddSuites(&framework.TestSuite{
Component: "CSI",
CanRunLocal: true,
Consul: false,
Cases: []framework.TestCase{
new(CSIVolumesTest),
},
})
}
type volumeConfig struct {
EBSVolumeID string `json:"ebs_volume"`
EFSVolumeID string `json:"efs_volume"`
}
func (tc *CSIVolumesTest) BeforeAll(f *framework.F) {
t := f.T()
// The volume IDs come from the external provider, so we need
// to read the configuration out of our Terraform output.
rawjson, err := ioutil.ReadFile("csi/input/volumes.json")
if err != nil {
t.Skip("volume ID configuration not found, try running 'terraform output volumes > ../csi/input/volumes.json'")
}
volumeIDs := &volumeConfig{}
err = json.Unmarshal(rawjson, volumeIDs)
if err != nil {
t.Fatal("volume ID configuration could not be read")
}
tc.volumeIDs = volumeIDs
// Ensure cluster has leader and at least two client
// nodes in a ready state before running tests
e2eutil.WaitForLeader(t, tc.Nomad())
e2eutil.WaitForNodesReady(t, tc.Nomad(), 2)
}
// TestEBSVolumeClaim launches AWS EBS plugins and registers an EBS volume
// as a Nomad CSI volume. We then deploy a job that writes to the volume,
// stop that job, and reuse the volume for another job which should be able
// to read the data written by the first job.
func (tc *CSIVolumesTest) TestEBSVolumeClaim(f *framework.F) {
t := f.T()
require := require.New(t)
nomadClient := tc.Nomad()
uuid := uuid.Generate()
// deploy the controller plugin job
controllerJobID := "aws-ebs-plugin-controller-" + uuid[0:8]
tc.jobIds = append(tc.jobIds, controllerJobID)
e2eutil.RegisterAndWaitForAllocs(t, nomadClient,
"csi/input/plugin-aws-ebs-controller.nomad", controllerJobID, "")
// deploy the node plugins job
nodesJobID := "aws-ebs-plugin-nodes-" + uuid[0:8]
tc.jobIds = append(tc.jobIds, nodesJobID)
e2eutil.RegisterAndWaitForAllocs(t, nomadClient,
"csi/input/plugin-aws-ebs-nodes.nomad", nodesJobID, "")
// wait for plugin to become healthy
require.Eventually(func() bool {
plugin, _, err := nomadClient.CSIPlugins().Info("aws-ebs0", nil)
if err != nil {
return false
}
if plugin.ControllersHealthy != 1 || plugin.NodesHealthy < 2 {
return false
}
return true
// TODO(tgross): cut down this time after fixing
// https://github.com/hashicorp/nomad/issues/7296
}, 90*time.Second, 5*time.Second)
// register a volume
volID := "ebs-vol0"
vol := &api.CSIVolume{
ID: volID,
Name: volID,
ExternalID: tc.volumeIDs.EBSVolumeID,
AccessMode: "single-node-writer",
AttachmentMode: "file-system",
PluginID: "aws-ebs0",
}
_, err := nomadClient.CSIVolumes().Register(vol, nil)
require.NoError(err)
defer nomadClient.CSIVolumes().Deregister(volID, nil)
// deploy a job that writes to the volume
writeJobID := "write-ebs-" + uuid[0:8]
tc.jobIds = append(tc.jobIds, writeJobID)
writeAllocs := e2eutil.RegisterAndWaitForAllocs(t, nomadClient,
"csi/input/use-ebs-volume.nomad", writeJobID, "")
writeAllocID := writeAllocs[0].ID
e2eutil.WaitForAllocRunning(t, nomadClient, writeAllocID)
// read data from volume and assert the writer wrote a file to it
writeAlloc, _, err := nomadClient.Allocations().Info(writeAllocID, nil)
require.NoError(err)
expectedPath := "/local/test/" + writeAllocID
_, err = readFile(nomadClient, writeAlloc, expectedPath)
require.NoError(err)
// Shutdown the writer so we can run a reader.
// we could mount the EBS volume with multi-attach, but we
// want this test to exercise the unpublish workflow.
nomadClient.Jobs().Deregister(writeJobID, true, nil)
// deploy a job so we can read from the volume
readJobID := "read-ebs-" + uuid[0:8]
tc.jobIds = append(tc.jobIds, readJobID)
readAllocs := e2eutil.RegisterAndWaitForAllocs(t, nomadClient,
"csi/input/use-ebs-volume.nomad", readJobID, "")
readAllocID := readAllocs[0].ID
e2eutil.WaitForAllocRunning(t, nomadClient, readAllocID)
// ensure we clean up claim before we deregister volumes
defer nomadClient.Jobs().Deregister(readJobID, true, nil)
// read data from volume and assert the writer wrote a file to it
readAlloc, _, err := nomadClient.Allocations().Info(readAllocID, nil)
require.NoError(err)
_, err = readFile(nomadClient, readAlloc, expectedPath)
require.NoError(err)
}
// TestEFSVolumeClaim launches AWS EFS plugins and registers an EFS volume
// as a Nomad CSI volume. We then deploy a job that writes to the volume,
// and share the volume with another job which should be able to read the
// data written by the first job.
func (tc *CSIVolumesTest) TestEFSVolumeClaim(f *framework.F) {
t := f.T()
require := require.New(t)
nomadClient := tc.Nomad()
uuid := uuid.Generate()
// deploy the node plugins job (no need for a controller for EFS)
nodesJobID := "aws-efs-plugin-nodes-" + uuid[0:8]
tc.jobIds = append(tc.jobIds, nodesJobID)
e2eutil.RegisterAndWaitForAllocs(t, nomadClient,
"csi/input/plugin-aws-efs-nodes.nomad", nodesJobID, "")
// wait for plugin to become healthy
require.Eventually(func() bool {
plugin, _, err := nomadClient.CSIPlugins().Info("aws-efs0", nil)
if err != nil {
return false
}
if plugin.NodesHealthy < 2 {
return false
}
return true
// TODO(tgross): cut down this time after fixing
// https://github.com/hashicorp/nomad/issues/7296
}, 90*time.Second, 5*time.Second)
// register a volume
volID := "efs-vol0"
vol := &api.CSIVolume{
ID: volID,
Name: volID,
ExternalID: tc.volumeIDs.EFSVolumeID,
AccessMode: "single-node-writer",
AttachmentMode: "file-system",
PluginID: "aws-efs0",
}
_, err := nomadClient.CSIVolumes().Register(vol, nil)
require.NoError(err)
defer nomadClient.CSIVolumes().Deregister(volID, nil)
// deploy a job that writes to the volume
writeJobID := "write-efs-" + uuid[0:8]
writeAllocs := e2eutil.RegisterAndWaitForAllocs(t, nomadClient,
"csi/input/use-efs-volume-write.nomad", writeJobID, "")
writeAllocID := writeAllocs[0].ID
e2eutil.WaitForAllocRunning(t, nomadClient, writeAllocID)
// read data from volume and assert the writer wrote a file to it
writeAlloc, _, err := nomadClient.Allocations().Info(writeAllocID, nil)
require.NoError(err)
expectedPath := "/local/test/" + writeAllocID
_, err = readFile(nomadClient, writeAlloc, expectedPath)
require.NoError(err)
// Shutdown the writer so we can run a reader.
// although EFS should support multiple readers, the plugin
// does not.
nomadClient.Jobs().Deregister(writeJobID, true, nil)
// deploy a job that reads from the volume.
readJobID := "read-efs-" + uuid[0:8]
readAllocs := e2eutil.RegisterAndWaitForAllocs(t, nomadClient,
"csi/input/use-efs-volume-read.nomad", readJobID, "")
defer nomadClient.Jobs().Deregister(readJobID, true, nil)
e2eutil.WaitForAllocRunning(t, nomadClient, readAllocs[0].ID)
// read data from volume and assert the writer wrote a file to it
readAlloc, _, err := nomadClient.Allocations().Info(readAllocs[0].ID, nil)
require.NoError(err)
_, err = readFile(nomadClient, readAlloc, expectedPath)
require.NoError(err)
}
func (tc *CSIVolumesTest) AfterEach(f *framework.F) {
nomadClient := tc.Nomad()
jobs := nomadClient.Jobs()
// Stop all jobs in test
for _, id := range tc.jobIds {
jobs.Deregister(id, true, nil)
}
// Garbage collect
nomadClient.System().GarbageCollect()
}
// TODO(tgross): replace this w/ AllocFS().Stat() after
// https://github.com/hashicorp/nomad/issues/7365 is fixed
func readFile(client *api.Client, alloc *api.Allocation, path string) (bytes.Buffer, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFn()
var stdout, stderr bytes.Buffer
_, err := client.Allocations().Exec(ctx,
alloc, "task", false,
[]string{"cat", path},
os.Stdin, &stdout, &stderr,
make(chan api.TerminalSize), nil)
return stdout, err
}