E2E: extend CSI test to cover create and snapshot workflows
Split the EBS and EFS tests out into their own test cases: * EBS exercises the Controller RPCs, including the create/snapshot workflow. * EFS exercises only the Node RPCs, and assumes we have an existing volume that gets registered, rather than created.
This commit is contained in:
parent
d2a9cd93a8
commit
da89103c5c
316
e2e/csi/csi.go
316
e2e/csi/csi.go
|
@ -13,29 +13,20 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
|
|
||||||
"github.com/hashicorp/nomad/api"
|
"github.com/hashicorp/nomad/api"
|
||||||
e2e "github.com/hashicorp/nomad/e2e/e2eutil"
|
e2e "github.com/hashicorp/nomad/e2e/e2eutil"
|
||||||
"github.com/hashicorp/nomad/e2e/framework"
|
"github.com/hashicorp/nomad/e2e/framework"
|
||||||
"github.com/hashicorp/nomad/helper/uuid"
|
|
||||||
"github.com/hashicorp/nomad/testutil"
|
"github.com/hashicorp/nomad/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
type CSIVolumesTest struct {
|
|
||||||
framework.TC
|
|
||||||
testJobIDs []string
|
|
||||||
volumeIDs []string
|
|
||||||
pluginJobIDs []string
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
framework.AddSuites(&framework.TestSuite{
|
framework.AddSuites(&framework.TestSuite{
|
||||||
Component: "CSI",
|
Component: "CSI",
|
||||||
CanRunLocal: true,
|
CanRunLocal: true,
|
||||||
Consul: false,
|
Consul: false,
|
||||||
Cases: []framework.TestCase{
|
Cases: []framework.TestCase{
|
||||||
new(CSIVolumesTest),
|
new(CSIControllerPluginEBSTest), // see ebs.go
|
||||||
|
new(CSINodeOnlyPluginEFSTest), // see efs.go
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -45,269 +36,27 @@ const ns = ""
|
||||||
var pluginWait = &e2e.WaitConfig{Interval: 5 * time.Second, Retries: 36} // 3min
|
var pluginWait = &e2e.WaitConfig{Interval: 5 * time.Second, Retries: 36} // 3min
|
||||||
var reapWait = &e2e.WaitConfig{Interval: 5 * time.Second, Retries: 36} // 3min
|
var reapWait = &e2e.WaitConfig{Interval: 5 * time.Second, Retries: 36} // 3min
|
||||||
|
|
||||||
func (tc *CSIVolumesTest) BeforeAll(f *framework.F) {
|
// assertNoErrorElseDump calls a non-halting assert on the error and dumps the
|
||||||
t := f.T()
|
// plugin logs if it fails.
|
||||||
|
func assertNoErrorElseDump(f *framework.F, err error, msg string, pluginJobIDs []string) {
|
||||||
_, err := os.Stat("csi/input/volume-ebs.hcl")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Skip("skipping CSI test because EBS volume spec file missing:", err)
|
dumpLogs(pluginJobIDs)
|
||||||
|
f.Assert().NoError(err, fmt.Sprintf("%v: %v", msg, err))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
_, err = os.Stat("csi/input/volume-efs.hcl")
|
// requireNoErrorElseDump calls a halting assert on the error and dumps the
|
||||||
|
// plugin logs if it fails.
|
||||||
|
func requireNoErrorElseDump(f *framework.F, err error, msg string, pluginJobIDs []string) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Skip("skipping CSI test because EFS volume spec file missing:", err)
|
dumpLogs(pluginJobIDs)
|
||||||
|
f.NoError(err, fmt.Sprintf("%v: %v", msg, err))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure cluster has leader and at least two client
|
|
||||||
// nodes in a ready state before running tests
|
|
||||||
e2e.WaitForLeader(t, tc.Nomad())
|
|
||||||
e2e.WaitForNodesReady(t, tc.Nomad(), 2)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestEBSVolumeClaim launches AWS EBS plugins and registers an EBS volume
|
func dumpLogs(pluginIDs []string) error {
|
||||||
// as a Nomad CSI volume. We then deploy a job that writes to the volume,
|
|
||||||
// stop that job, and reuse the volume for another job which should be able
|
|
||||||
// to read the data written by the first job.
|
|
||||||
func (tc *CSIVolumesTest) TestEBSVolumeClaim(f *framework.F) {
|
|
||||||
t := f.T()
|
|
||||||
require := require.New(t)
|
|
||||||
nomadClient := tc.Nomad()
|
|
||||||
uuid := uuid.Generate()
|
|
||||||
pluginID := "aws-ebs0"
|
|
||||||
|
|
||||||
// deploy the controller plugin job
|
for _, id := range pluginIDs {
|
||||||
controllerJobID := "aws-ebs-plugin-controller-" + uuid[0:8]
|
|
||||||
f.NoError(e2e.Register(controllerJobID, "csi/input/plugin-aws-ebs-controller.nomad"))
|
|
||||||
tc.pluginJobIDs = append(tc.pluginJobIDs, controllerJobID)
|
|
||||||
expected := []string{"running", "running"}
|
|
||||||
f.NoError(
|
|
||||||
e2e.WaitForAllocStatusExpected(controllerJobID, ns, expected),
|
|
||||||
"job should be running")
|
|
||||||
|
|
||||||
// deploy the node plugins job
|
|
||||||
nodesJobID := "aws-ebs-plugin-nodes-" + uuid[0:8]
|
|
||||||
f.NoError(e2e.Register(nodesJobID, "csi/input/plugin-aws-ebs-nodes.nomad"))
|
|
||||||
tc.pluginJobIDs = append(tc.pluginJobIDs, nodesJobID)
|
|
||||||
|
|
||||||
f.NoError(e2e.WaitForAllocStatusComparison(
|
|
||||||
func() ([]string, error) { return e2e.AllocStatuses(nodesJobID, ns) },
|
|
||||||
func(got []string) bool {
|
|
||||||
for _, status := range got {
|
|
||||||
if status != "running" {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}, nil,
|
|
||||||
))
|
|
||||||
|
|
||||||
f.NoError(waitForPluginStatusControllerCount(pluginID, 2, pluginWait),
|
|
||||||
"aws-ebs0 controller plugins did not become healthy")
|
|
||||||
f.NoError(waitForPluginStatusMinNodeCount(pluginID, 2, pluginWait),
|
|
||||||
"aws-ebs0 node plugins did not become healthy")
|
|
||||||
|
|
||||||
// register a volume
|
|
||||||
volID := "ebs-vol[0]"
|
|
||||||
err := volumeRegister(volID, "csi/input/volume-ebs.hcl")
|
|
||||||
require.NoError(err)
|
|
||||||
tc.volumeIDs = append(tc.volumeIDs, volID)
|
|
||||||
|
|
||||||
// deploy a job that writes to the volume
|
|
||||||
writeJobID := "write-ebs-" + uuid[0:8]
|
|
||||||
f.NoError(e2e.Register(writeJobID, "csi/input/use-ebs-volume.nomad"))
|
|
||||||
f.NoError(
|
|
||||||
e2e.WaitForAllocStatusExpected(writeJobID, ns, []string{"running"}),
|
|
||||||
"job should be running")
|
|
||||||
|
|
||||||
allocs, err := e2e.AllocsForJob(writeJobID, ns)
|
|
||||||
f.NoError(err, "could not get allocs for write job")
|
|
||||||
f.Len(allocs, 1, "could not get allocs for write job")
|
|
||||||
writeAllocID := allocs[0]["ID"]
|
|
||||||
|
|
||||||
// read data from volume and assert the writer wrote a file to it
|
|
||||||
expectedPath := "/task/test/" + writeAllocID
|
|
||||||
_, err = readFile(nomadClient, writeAllocID, expectedPath)
|
|
||||||
require.NoError(err)
|
|
||||||
|
|
||||||
// Shutdown (and purge) the writer so we can run a reader.
|
|
||||||
// we could mount the EBS volume with multi-attach, but we
|
|
||||||
// want this test to exercise the unpublish workflow.
|
|
||||||
_, err = e2e.Command("nomad", "job", "stop", "-purge", writeJobID)
|
|
||||||
require.NoError(err)
|
|
||||||
|
|
||||||
// wait for the volume unpublish workflow to complete
|
|
||||||
require.NoError(waitForVolumeClaimRelease(volID, reapWait),
|
|
||||||
"write-ebs alloc claim was not released")
|
|
||||||
|
|
||||||
// deploy a job so we can read from the volume
|
|
||||||
readJobID := "read-ebs-" + uuid[0:8]
|
|
||||||
tc.testJobIDs = append(tc.testJobIDs, readJobID) // ensure failed tests clean up
|
|
||||||
f.NoError(e2e.Register(readJobID, "csi/input/use-ebs-volume.nomad"))
|
|
||||||
f.NoError(
|
|
||||||
e2e.WaitForAllocStatusExpected(readJobID, ns, []string{"running"}),
|
|
||||||
"job should be running")
|
|
||||||
|
|
||||||
allocs, err = e2e.AllocsForJob(readJobID, ns)
|
|
||||||
f.NoError(err, "could not get allocs for read job")
|
|
||||||
f.Len(allocs, 1, "could not get allocs for read job")
|
|
||||||
readAllocID := allocs[0]["ID"]
|
|
||||||
|
|
||||||
// read data from volume and assert we can read the file the writer wrote
|
|
||||||
expectedPath = "/task/test/" + readAllocID
|
|
||||||
_, err = readFile(nomadClient, readAllocID, expectedPath)
|
|
||||||
require.NoError(err)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestEFSVolumeClaim launches AWS EFS plugins and registers an EFS volume
|
|
||||||
// as a Nomad CSI volume. We then deploy a job that writes to the volume,
|
|
||||||
// and share the volume with another job which should be able to read the
|
|
||||||
// data written by the first job.
|
|
||||||
func (tc *CSIVolumesTest) TestEFSVolumeClaim(f *framework.F) {
|
|
||||||
t := f.T()
|
|
||||||
require := require.New(t)
|
|
||||||
nomadClient := tc.Nomad()
|
|
||||||
uuid := uuid.Generate()
|
|
||||||
pluginID := "aws-efs0"
|
|
||||||
|
|
||||||
// deploy the node plugins job (no need for a controller for EFS)
|
|
||||||
nodesJobID := "aws-efs-plugin-nodes-" + uuid[0:8]
|
|
||||||
f.NoError(e2e.Register(nodesJobID, "csi/input/plugin-aws-efs-nodes.nomad"))
|
|
||||||
tc.pluginJobIDs = append(tc.pluginJobIDs, nodesJobID)
|
|
||||||
|
|
||||||
f.NoError(e2e.WaitForAllocStatusComparison(
|
|
||||||
func() ([]string, error) { return e2e.AllocStatuses(nodesJobID, ns) },
|
|
||||||
func(got []string) bool {
|
|
||||||
for _, status := range got {
|
|
||||||
if status != "running" {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}, nil,
|
|
||||||
))
|
|
||||||
|
|
||||||
f.NoError(waitForPluginStatusMinNodeCount(pluginID, 2, pluginWait),
|
|
||||||
"aws-efs0 node plugins did not become healthy")
|
|
||||||
|
|
||||||
// register a volume
|
|
||||||
volID := "efs-vol0"
|
|
||||||
err := volumeRegister(volID, "csi/input/volume-efs.hcl")
|
|
||||||
require.NoError(err)
|
|
||||||
tc.volumeIDs = append(tc.volumeIDs, volID)
|
|
||||||
|
|
||||||
// deploy a job that writes to the volume
|
|
||||||
writeJobID := "write-efs-" + uuid[0:8]
|
|
||||||
tc.testJobIDs = append(tc.testJobIDs, writeJobID) // ensure failed tests clean up
|
|
||||||
f.NoError(e2e.Register(writeJobID, "csi/input/use-efs-volume-write.nomad"))
|
|
||||||
f.NoError(
|
|
||||||
e2e.WaitForAllocStatusExpected(writeJobID, ns, []string{"running"}),
|
|
||||||
"job should be running")
|
|
||||||
|
|
||||||
allocs, err := e2e.AllocsForJob(writeJobID, ns)
|
|
||||||
f.NoError(err, "could not get allocs for write job")
|
|
||||||
f.Len(allocs, 1, "could not get allocs for write job")
|
|
||||||
writeAllocID := allocs[0]["ID"]
|
|
||||||
|
|
||||||
// read data from volume and assert the writer wrote a file to it
|
|
||||||
expectedPath := "/task/test/" + writeAllocID
|
|
||||||
_, err = readFile(nomadClient, writeAllocID, expectedPath)
|
|
||||||
require.NoError(err)
|
|
||||||
|
|
||||||
// Shutdown the writer so we can run a reader.
|
|
||||||
// although EFS should support multiple readers, the plugin
|
|
||||||
// does not.
|
|
||||||
_, err = e2e.Command("nomad", "job", "stop", writeJobID)
|
|
||||||
require.NoError(err)
|
|
||||||
|
|
||||||
// wait for the volume unpublish workflow to complete
|
|
||||||
require.NoError(waitForVolumeClaimRelease(volID, reapWait),
|
|
||||||
"write-efs alloc claim was not released")
|
|
||||||
|
|
||||||
// deploy a job that reads from the volume
|
|
||||||
readJobID := "read-efs-" + uuid[0:8]
|
|
||||||
tc.testJobIDs = append(tc.testJobIDs, readJobID) // ensure failed tests clean up
|
|
||||||
f.NoError(e2e.Register(readJobID, "csi/input/use-efs-volume-read.nomad"))
|
|
||||||
f.NoError(
|
|
||||||
e2e.WaitForAllocStatusExpected(readJobID, ns, []string{"running"}),
|
|
||||||
"job should be running")
|
|
||||||
|
|
||||||
allocs, err = e2e.AllocsForJob(readJobID, ns)
|
|
||||||
f.NoError(err, "could not get allocs for read job")
|
|
||||||
f.Len(allocs, 1, "could not get allocs for read job")
|
|
||||||
readAllocID := allocs[0]["ID"]
|
|
||||||
|
|
||||||
// read data from volume and assert the writer wrote a file to it
|
|
||||||
require.NoError(err)
|
|
||||||
_, err = readFile(nomadClient, readAllocID, expectedPath)
|
|
||||||
require.NoError(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tc *CSIVolumesTest) AfterEach(f *framework.F) {
|
|
||||||
|
|
||||||
// Stop all jobs in test
|
|
||||||
for _, id := range tc.testJobIDs {
|
|
||||||
out, err := e2e.Command("nomad", "job", "stop", "-purge", id)
|
|
||||||
f.Assert().NoError(err, out)
|
|
||||||
}
|
|
||||||
tc.testJobIDs = []string{}
|
|
||||||
|
|
||||||
// Deregister all volumes in test
|
|
||||||
for _, id := range tc.volumeIDs {
|
|
||||||
// make sure all the test jobs have finished unpublishing claims
|
|
||||||
err := waitForVolumeClaimRelease(id, reapWait)
|
|
||||||
f.Assert().NoError(err, "volume claims were not released")
|
|
||||||
|
|
||||||
out, err := e2e.Command("nomad", "volume", "deregister", id)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println("could not deregister volume, dumping allocation logs")
|
|
||||||
f.Assert().NoError(tc.dumpLogs())
|
|
||||||
}
|
|
||||||
f.Assert().NoError(err, out)
|
|
||||||
}
|
|
||||||
tc.volumeIDs = []string{}
|
|
||||||
|
|
||||||
// Deregister all plugin jobs in test
|
|
||||||
for _, id := range tc.pluginJobIDs {
|
|
||||||
out, err := e2e.Command("nomad", "job", "stop", "-purge", id)
|
|
||||||
f.Assert().NoError(err, out)
|
|
||||||
}
|
|
||||||
tc.pluginJobIDs = []string{}
|
|
||||||
|
|
||||||
// Garbage collect
|
|
||||||
out, err := e2e.Command("nomad", "system", "gc")
|
|
||||||
f.Assert().NoError(err, out)
|
|
||||||
}
|
|
||||||
|
|
||||||
// waitForVolumeClaimRelease makes sure we don't try to re-claim a volume
|
|
||||||
// that's in the process of being unpublished. we can't just wait for allocs
|
|
||||||
// to stop, but need to wait for their claims to be released
|
|
||||||
func waitForVolumeClaimRelease(volID string, wc *e2e.WaitConfig) error {
|
|
||||||
var out string
|
|
||||||
var err error
|
|
||||||
testutil.WaitForResultRetries(wc.Retries, func() (bool, error) {
|
|
||||||
time.Sleep(wc.Interval)
|
|
||||||
out, err = e2e.Command("nomad", "volume", "status", volID)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
section, err := e2e.GetSection(out, "Allocations")
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
return strings.Contains(section, "No allocations placed"), nil
|
|
||||||
}, func(e error) {
|
|
||||||
if e == nil {
|
|
||||||
err = nil
|
|
||||||
}
|
|
||||||
err = fmt.Errorf("alloc claim was not released: %v\n%s", e, out)
|
|
||||||
})
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tc *CSIVolumesTest) dumpLogs() error {
|
|
||||||
|
|
||||||
for _, id := range tc.pluginJobIDs {
|
|
||||||
allocs, err := e2e.AllocsForJob(id, ns)
|
allocs, err := e2e.AllocsForJob(id, ns)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not find allocs for plugin: %v", err)
|
return fmt.Errorf("could not find allocs for plugin: %v", err)
|
||||||
|
@ -340,6 +89,32 @@ func (tc *CSIVolumesTest) dumpLogs() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// waitForVolumeClaimRelease makes sure we don't try to re-claim a volume
|
||||||
|
// that's in the process of being unpublished. we can't just wait for allocs
|
||||||
|
// to stop, but need to wait for their claims to be released
|
||||||
|
func waitForVolumeClaimRelease(volID string, wc *e2e.WaitConfig) error {
|
||||||
|
var out string
|
||||||
|
var err error
|
||||||
|
testutil.WaitForResultRetries(wc.Retries, func() (bool, error) {
|
||||||
|
time.Sleep(wc.Interval)
|
||||||
|
out, err = e2e.Command("nomad", "volume", "status", volID)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
section, err := e2e.GetSection(out, "Allocations")
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return strings.Contains(section, "No allocations placed"), nil
|
||||||
|
}, func(e error) {
|
||||||
|
if e == nil {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
err = fmt.Errorf("alloc claim was not released: %v\n%s", e, out)
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// TODO(tgross): replace this w/ AllocFS().Stat() after
|
// TODO(tgross): replace this w/ AllocFS().Stat() after
|
||||||
// https://github.com/hashicorp/nomad/issues/7365 is fixed
|
// https://github.com/hashicorp/nomad/issues/7365 is fixed
|
||||||
func readFile(client *api.Client, allocID string, path string) (bytes.Buffer, error) {
|
func readFile(client *api.Client, allocID string, path string) (bytes.Buffer, error) {
|
||||||
|
@ -434,11 +209,12 @@ func waitForPluginStatusCompare(pluginID string, compare func(got string) (bool,
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// VolumeRegister registers a jobspec from a file but with a unique ID.
|
// volumeRegister creates or registers a volume spec from a file but with a
|
||||||
// The caller is responsible for recording that ID for later cleanup.
|
// unique ID. The caller is responsible for recording that ID for later
|
||||||
func volumeRegister(volID, volFilePath string) error {
|
// cleanup.
|
||||||
|
func volumeRegister(volID, volFilePath, createOrRegister string) error {
|
||||||
|
|
||||||
cmd := exec.Command("nomad", "volume", "register", "-")
|
cmd := exec.Command("nomad", "volume", createOrRegister, "-")
|
||||||
stdin, err := cmd.StdinPipe()
|
stdin, err := cmd.StdinPipe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not open stdin?: %w", err)
|
return fmt.Errorf("could not open stdin?: %w", err)
|
||||||
|
|
175
e2e/csi/ebs.go
Normal file
175
e2e/csi/ebs.go
Normal file
|
@ -0,0 +1,175 @@
|
||||||
|
package csi
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
e2e "github.com/hashicorp/nomad/e2e/e2eutil"
|
||||||
|
"github.com/hashicorp/nomad/e2e/framework"
|
||||||
|
"github.com/hashicorp/nomad/helper/uuid"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CSIControllerPluginEBSTest exercises the AWS EBS plugin, which is an
|
||||||
|
// example of a plugin that supports most of the CSI Controller RPCs.
|
||||||
|
type CSIControllerPluginEBSTest struct {
|
||||||
|
framework.TC
|
||||||
|
uuid string
|
||||||
|
testJobIDs []string
|
||||||
|
volumeIDs []string
|
||||||
|
pluginJobIDs []string
|
||||||
|
}
|
||||||
|
|
||||||
|
const ebsPluginID = "aws-ebs0"
|
||||||
|
|
||||||
|
// BeforeAll waits for the cluster to be ready, deploys the CSI plugins, and
|
||||||
|
// creates two EBS volumes for use in the test.
|
||||||
|
func (tc *CSIControllerPluginEBSTest) BeforeAll(f *framework.F) {
|
||||||
|
e2e.WaitForLeader(f.T(), tc.Nomad())
|
||||||
|
e2e.WaitForNodesReady(f.T(), tc.Nomad(), 2)
|
||||||
|
|
||||||
|
tc.uuid = uuid.Generate()[0:8]
|
||||||
|
|
||||||
|
// deploy the controller plugin job
|
||||||
|
controllerJobID := "aws-ebs-plugin-controller-" + tc.uuid
|
||||||
|
f.NoError(e2e.Register(controllerJobID, "csi/input/plugin-aws-ebs-controller.nomad"))
|
||||||
|
tc.pluginJobIDs = append(tc.pluginJobIDs, controllerJobID)
|
||||||
|
expected := []string{"running", "running"}
|
||||||
|
f.NoError(
|
||||||
|
e2e.WaitForAllocStatusExpected(controllerJobID, ns, expected),
|
||||||
|
"job should be running")
|
||||||
|
|
||||||
|
// deploy the node plugins job
|
||||||
|
nodesJobID := "aws-ebs-plugin-nodes-" + tc.uuid
|
||||||
|
f.NoError(e2e.Register(nodesJobID, "csi/input/plugin-aws-ebs-nodes.nomad"))
|
||||||
|
tc.pluginJobIDs = append(tc.pluginJobIDs, nodesJobID)
|
||||||
|
|
||||||
|
f.NoError(e2e.WaitForAllocStatusComparison(
|
||||||
|
func() ([]string, error) { return e2e.AllocStatuses(nodesJobID, ns) },
|
||||||
|
func(got []string) bool {
|
||||||
|
for _, status := range got {
|
||||||
|
if status != "running" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}, nil,
|
||||||
|
))
|
||||||
|
|
||||||
|
f.NoError(waitForPluginStatusControllerCount(ebsPluginID, 2, pluginWait),
|
||||||
|
"aws-ebs0 controller plugins did not become healthy")
|
||||||
|
f.NoError(waitForPluginStatusMinNodeCount(ebsPluginID, 2, pluginWait),
|
||||||
|
"aws-ebs0 node plugins did not become healthy")
|
||||||
|
|
||||||
|
// ideally we'd wait until after we check `nomad volume status -verbose`
|
||||||
|
// to verify these volumes are ready, but the plugin doesn't support the
|
||||||
|
// CSI ListVolumes RPC
|
||||||
|
volID := "ebs-vol[0]"
|
||||||
|
err := volumeRegister(volID, "csi/input/ebs-volume0.hcl", "create")
|
||||||
|
requireNoErrorElseDump(f, err, "could not create volume", tc.pluginJobIDs)
|
||||||
|
tc.volumeIDs = append(tc.volumeIDs, volID)
|
||||||
|
|
||||||
|
volID = "ebs-vol[1]"
|
||||||
|
err = volumeRegister(volID, "csi/input/ebs-volume1.hcl", "create")
|
||||||
|
requireNoErrorElseDump(f, err, "could not create volume", tc.pluginJobIDs)
|
||||||
|
tc.volumeIDs = append(tc.volumeIDs, volID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AfterAll cleans up the volumes and plugin jobs created by the test.
|
||||||
|
func (tc *CSIControllerPluginEBSTest) AfterAll(f *framework.F) {
|
||||||
|
for _, volID := range tc.volumeIDs {
|
||||||
|
err := waitForVolumeClaimRelease(volID, reapWait)
|
||||||
|
f.Assert().NoError(err, "volume claims were not released")
|
||||||
|
|
||||||
|
out, err := e2e.Command("nomad", "volume", "delete", volID)
|
||||||
|
assertNoErrorElseDump(f, err,
|
||||||
|
fmt.Sprintf("could not delete volume:\n%v", out), tc.pluginJobIDs)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deregister all plugin jobs in test
|
||||||
|
for _, id := range tc.pluginJobIDs {
|
||||||
|
out, err := e2e.Command("nomad", "job", "stop", "-purge", id)
|
||||||
|
f.Assert().NoError(err, out)
|
||||||
|
}
|
||||||
|
tc.pluginJobIDs = []string{}
|
||||||
|
|
||||||
|
// Garbage collect
|
||||||
|
out, err := e2e.Command("nomad", "system", "gc")
|
||||||
|
f.Assert().NoError(err, out)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestVolumeClaim exercises the volume publish/unpublish workflows for the
|
||||||
|
// EBS plugin.
|
||||||
|
func (tc *CSIControllerPluginEBSTest) TestVolumeClaim(f *framework.F) {
|
||||||
|
nomadClient := tc.Nomad()
|
||||||
|
|
||||||
|
// deploy a job that writes to the volume
|
||||||
|
writeJobID := "write-ebs-" + tc.uuid
|
||||||
|
f.NoError(e2e.Register(writeJobID, "csi/input/use-ebs-volume.nomad"))
|
||||||
|
f.NoError(
|
||||||
|
e2e.WaitForAllocStatusExpected(writeJobID, ns, []string{"running"}),
|
||||||
|
"job should be running")
|
||||||
|
|
||||||
|
allocs, err := e2e.AllocsForJob(writeJobID, ns)
|
||||||
|
f.NoError(err, "could not get allocs for write job")
|
||||||
|
f.Len(allocs, 1, "could not get allocs for write job")
|
||||||
|
writeAllocID := allocs[0]["ID"]
|
||||||
|
|
||||||
|
// read data from volume and assert the writer wrote a file to it
|
||||||
|
expectedPath := "/task/test/" + writeAllocID
|
||||||
|
_, err = readFile(nomadClient, writeAllocID, expectedPath)
|
||||||
|
f.NoError(err)
|
||||||
|
|
||||||
|
// Shutdown (and purge) the writer so we can run a reader.
|
||||||
|
// we could mount the EBS volume with multi-attach, but we
|
||||||
|
// want this test to exercise the unpublish workflow.
|
||||||
|
_, err = e2e.Command("nomad", "job", "stop", "-purge", writeJobID)
|
||||||
|
f.NoError(err)
|
||||||
|
|
||||||
|
// wait for the volume unpublish workflow to complete
|
||||||
|
for _, volID := range tc.volumeIDs {
|
||||||
|
err := waitForVolumeClaimRelease(volID, reapWait)
|
||||||
|
f.NoError(err, "volume claims were not released")
|
||||||
|
}
|
||||||
|
|
||||||
|
// deploy a job so we can read from the volume
|
||||||
|
readJobID := "read-ebs-" + tc.uuid
|
||||||
|
tc.testJobIDs = append(tc.testJobIDs, readJobID) // ensure failed tests clean up
|
||||||
|
f.NoError(e2e.Register(readJobID, "csi/input/use-ebs-volume.nomad"))
|
||||||
|
f.NoError(
|
||||||
|
e2e.WaitForAllocStatusExpected(readJobID, ns, []string{"running"}),
|
||||||
|
"job should be running")
|
||||||
|
|
||||||
|
allocs, err = e2e.AllocsForJob(readJobID, ns)
|
||||||
|
f.NoError(err, "could not get allocs for read job")
|
||||||
|
f.Len(allocs, 1, "could not get allocs for read job")
|
||||||
|
readAllocID := allocs[0]["ID"]
|
||||||
|
|
||||||
|
// read data from volume and assert we can read the file the writer wrote
|
||||||
|
expectedPath = "/task/test/" + readAllocID
|
||||||
|
_, err = readFile(nomadClient, readAllocID, expectedPath)
|
||||||
|
f.NoError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestSnapshot exercises the snapshot commands.
|
||||||
|
func (tc *CSIControllerPluginEBSTest) TestSnapshot(f *framework.F) {
|
||||||
|
|
||||||
|
out, err := e2e.Command("nomad", "volume", "snapshot", "create",
|
||||||
|
tc.volumeIDs[0], "snap-"+tc.uuid)
|
||||||
|
requireNoErrorElseDump(f, err, "could not create volume snapshot", tc.pluginJobIDs)
|
||||||
|
|
||||||
|
snaps, err := e2e.ParseColumns(out)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
_, err := e2e.Command("nomad", "volume", "snapshot", "delete",
|
||||||
|
ebsPluginID, snaps[0]["External ID"])
|
||||||
|
requireNoErrorElseDump(f, err, "could not delete volume snapshot", tc.pluginJobIDs)
|
||||||
|
}()
|
||||||
|
|
||||||
|
f.NoError(err, fmt.Sprintf("could not parse output:\n%v", out))
|
||||||
|
f.Len(snaps, 1, fmt.Sprintf("could not parse output:\n%v", out))
|
||||||
|
|
||||||
|
out, err = e2e.Command("nomad", "volume", "snapshot", "list")
|
||||||
|
requireNoErrorElseDump(f, err, "could not list volume snapshots", tc.pluginJobIDs)
|
||||||
|
f.Contains(out, snaps[0]["ID"],
|
||||||
|
fmt.Sprintf("volume snapshot list did not include expected snapshot:\n%v", out))
|
||||||
|
}
|
153
e2e/csi/efs.go
Normal file
153
e2e/csi/efs.go
Normal file
|
@ -0,0 +1,153 @@
|
||||||
|
package csi
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
e2e "github.com/hashicorp/nomad/e2e/e2eutil"
|
||||||
|
"github.com/hashicorp/nomad/e2e/framework"
|
||||||
|
"github.com/hashicorp/nomad/helper/uuid"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CSINodeOnlyPluginEFSTest exercises the AWS EFS plugin, which is an
|
||||||
|
// example of a plugin that can run in Node-only mode.
|
||||||
|
type CSINodeOnlyPluginEFSTest struct {
|
||||||
|
framework.TC
|
||||||
|
uuid string
|
||||||
|
testJobIDs []string
|
||||||
|
volumeIDs []string
|
||||||
|
pluginJobIDs []string
|
||||||
|
}
|
||||||
|
|
||||||
|
const efsPluginID = "aws-efs0"
|
||||||
|
|
||||||
|
func (tc *CSINodeOnlyPluginEFSTest) BeforeAll(f *framework.F) {
|
||||||
|
t := f.T()
|
||||||
|
|
||||||
|
_, err := os.Stat("csi/input/volume-efs.hcl")
|
||||||
|
if err != nil {
|
||||||
|
t.Skip("skipping CSI test because EFS volume spec file missing:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure cluster has leader and at least two client
|
||||||
|
// nodes in a ready state before running tests
|
||||||
|
e2e.WaitForLeader(t, tc.Nomad())
|
||||||
|
e2e.WaitForNodesReady(t, tc.Nomad(), 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEFSVolumeClaim launches AWS EFS plugins and registers an EFS volume
|
||||||
|
// as a Nomad CSI volume. We then deploy a job that writes to the volume,
|
||||||
|
// and share the volume with another job which should be able to read the
|
||||||
|
// data written by the first job.
|
||||||
|
func (tc *CSINodeOnlyPluginEFSTest) TestEFSVolumeClaim(f *framework.F) {
|
||||||
|
t := f.T()
|
||||||
|
require := require.New(t)
|
||||||
|
nomadClient := tc.Nomad()
|
||||||
|
tc.uuid = uuid.Generate()[0:8]
|
||||||
|
|
||||||
|
// deploy the node plugins job (no need for a controller for EFS)
|
||||||
|
nodesJobID := "aws-efs-plugin-nodes-" + tc.uuid
|
||||||
|
f.NoError(e2e.Register(nodesJobID, "csi/input/plugin-aws-efs-nodes.nomad"))
|
||||||
|
tc.pluginJobIDs = append(tc.pluginJobIDs, nodesJobID)
|
||||||
|
|
||||||
|
f.NoError(e2e.WaitForAllocStatusComparison(
|
||||||
|
func() ([]string, error) { return e2e.AllocStatuses(nodesJobID, ns) },
|
||||||
|
func(got []string) bool {
|
||||||
|
for _, status := range got {
|
||||||
|
if status != "running" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}, nil,
|
||||||
|
))
|
||||||
|
|
||||||
|
f.NoError(waitForPluginStatusMinNodeCount(efsPluginID, 2, pluginWait),
|
||||||
|
"aws-efs0 node plugins did not become healthy")
|
||||||
|
|
||||||
|
// register a volume
|
||||||
|
volID := "efs-vol0"
|
||||||
|
err := volumeRegister(volID, "csi/input/volume-efs.hcl", "register")
|
||||||
|
require.NoError(err)
|
||||||
|
tc.volumeIDs = append(tc.volumeIDs, volID)
|
||||||
|
|
||||||
|
// deploy a job that writes to the volume
|
||||||
|
writeJobID := "write-efs-" + tc.uuid
|
||||||
|
tc.testJobIDs = append(tc.testJobIDs, writeJobID) // ensure failed tests clean up
|
||||||
|
f.NoError(e2e.Register(writeJobID, "csi/input/use-efs-volume-write.nomad"))
|
||||||
|
f.NoError(
|
||||||
|
e2e.WaitForAllocStatusExpected(writeJobID, ns, []string{"running"}),
|
||||||
|
"job should be running")
|
||||||
|
|
||||||
|
allocs, err := e2e.AllocsForJob(writeJobID, ns)
|
||||||
|
f.NoError(err, "could not get allocs for write job")
|
||||||
|
f.Len(allocs, 1, "could not get allocs for write job")
|
||||||
|
writeAllocID := allocs[0]["ID"]
|
||||||
|
|
||||||
|
// read data from volume and assert the writer wrote a file to it
|
||||||
|
expectedPath := "/task/test/" + writeAllocID
|
||||||
|
_, err = readFile(nomadClient, writeAllocID, expectedPath)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
// Shutdown the writer so we can run a reader.
|
||||||
|
// although EFS should support multiple readers, the plugin
|
||||||
|
// does not.
|
||||||
|
_, err = e2e.Command("nomad", "job", "stop", writeJobID)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
// wait for the volume unpublish workflow to complete
|
||||||
|
require.NoError(waitForVolumeClaimRelease(volID, reapWait),
|
||||||
|
"write-efs alloc claim was not released")
|
||||||
|
|
||||||
|
// deploy a job that reads from the volume
|
||||||
|
readJobID := "read-efs-" + tc.uuid
|
||||||
|
tc.testJobIDs = append(tc.testJobIDs, readJobID) // ensure failed tests clean up
|
||||||
|
f.NoError(e2e.Register(readJobID, "csi/input/use-efs-volume-read.nomad"))
|
||||||
|
f.NoError(
|
||||||
|
e2e.WaitForAllocStatusExpected(readJobID, ns, []string{"running"}),
|
||||||
|
"job should be running")
|
||||||
|
|
||||||
|
allocs, err = e2e.AllocsForJob(readJobID, ns)
|
||||||
|
f.NoError(err, "could not get allocs for read job")
|
||||||
|
f.Len(allocs, 1, "could not get allocs for read job")
|
||||||
|
readAllocID := allocs[0]["ID"]
|
||||||
|
|
||||||
|
// read data from volume and assert the writer wrote a file to it
|
||||||
|
require.NoError(err)
|
||||||
|
_, err = readFile(nomadClient, readAllocID, expectedPath)
|
||||||
|
require.NoError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tc *CSINodeOnlyPluginEFSTest) AfterEach(f *framework.F) {
|
||||||
|
|
||||||
|
// Stop all jobs in test
|
||||||
|
for _, id := range tc.testJobIDs {
|
||||||
|
out, err := e2e.Command("nomad", "job", "stop", "-purge", id)
|
||||||
|
f.Assert().NoError(err, out)
|
||||||
|
}
|
||||||
|
tc.testJobIDs = []string{}
|
||||||
|
|
||||||
|
// Deregister all volumes in test
|
||||||
|
for _, id := range tc.volumeIDs {
|
||||||
|
// make sure all the test jobs have finished unpublishing claims
|
||||||
|
err := waitForVolumeClaimRelease(id, reapWait)
|
||||||
|
f.Assert().NoError(err, "volume claims were not released")
|
||||||
|
|
||||||
|
out, err := e2e.Command("nomad", "volume", "deregister", id)
|
||||||
|
assertNoErrorElseDump(f, err,
|
||||||
|
fmt.Sprintf("could not deregister volume:\n%v", out), tc.pluginJobIDs)
|
||||||
|
}
|
||||||
|
tc.volumeIDs = []string{}
|
||||||
|
|
||||||
|
// Deregister all plugin jobs in test
|
||||||
|
for _, id := range tc.pluginJobIDs {
|
||||||
|
out, err := e2e.Command("nomad", "job", "stop", "-purge", id)
|
||||||
|
f.Assert().NoError(err, out)
|
||||||
|
}
|
||||||
|
tc.pluginJobIDs = []string{}
|
||||||
|
|
||||||
|
// Garbage collect
|
||||||
|
out, err := e2e.Command("nomad", "system", "gc")
|
||||||
|
f.Assert().NoError(err, out)
|
||||||
|
}
|
21
e2e/csi/input/ebs-volume0.hcl
Normal file
21
e2e/csi/input/ebs-volume0.hcl
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
id = "ebs-vol[0]"
|
||||||
|
name = "this-is-a-test-0" # CSIVolumeName tag
|
||||||
|
type = "csi"
|
||||||
|
plugin_id = "aws-ebs0"
|
||||||
|
|
||||||
|
capacity_min = "10GiB"
|
||||||
|
capacity_max = "20GiB"
|
||||||
|
|
||||||
|
capability {
|
||||||
|
access_mode = "single-node-writer"
|
||||||
|
attachment_mode = "file-system"
|
||||||
|
}
|
||||||
|
|
||||||
|
capability {
|
||||||
|
access_mode = "single-node-writer"
|
||||||
|
attachment_mode = "block-device"
|
||||||
|
}
|
||||||
|
|
||||||
|
parameters {
|
||||||
|
type = "gp2"
|
||||||
|
}
|
21
e2e/csi/input/ebs-volume1.hcl
Normal file
21
e2e/csi/input/ebs-volume1.hcl
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
id = "ebs-vol[1]"
|
||||||
|
name = "this-is-a-test-1" # CSIVolumeName tag
|
||||||
|
type = "csi"
|
||||||
|
plugin_id = "aws-ebs0"
|
||||||
|
|
||||||
|
capacity_min = "10GiB"
|
||||||
|
capacity_max = "20GiB"
|
||||||
|
|
||||||
|
capability {
|
||||||
|
access_mode = "single-node-writer"
|
||||||
|
attachment_mode = "file-system"
|
||||||
|
}
|
||||||
|
|
||||||
|
capability {
|
||||||
|
access_mode = "single-node-writer"
|
||||||
|
attachment_mode = "block-device"
|
||||||
|
}
|
||||||
|
|
||||||
|
parameters {
|
||||||
|
type = "gp2"
|
||||||
|
}
|
|
@ -9,9 +9,11 @@ job "use-ebs-volume" {
|
||||||
|
|
||||||
group "group" {
|
group "group" {
|
||||||
volume "test" {
|
volume "test" {
|
||||||
type = "csi"
|
type = "csi"
|
||||||
source = "ebs-vol"
|
source = "ebs-vol"
|
||||||
per_alloc = true
|
attachment_mode = "file-system"
|
||||||
|
access_mode = "single-node-writer"
|
||||||
|
per_alloc = true
|
||||||
}
|
}
|
||||||
|
|
||||||
task "task" {
|
task "task" {
|
||||||
|
|
|
@ -9,9 +9,12 @@ job "use-efs-volume" {
|
||||||
}
|
}
|
||||||
|
|
||||||
group "group" {
|
group "group" {
|
||||||
|
|
||||||
volume "test" {
|
volume "test" {
|
||||||
type = "csi"
|
type = "csi"
|
||||||
source = "efs-vol0"
|
source = "efs-vol0"
|
||||||
|
attachment_mode = "file-system"
|
||||||
|
access_mode = "single-node-writer"
|
||||||
}
|
}
|
||||||
|
|
||||||
task "task" {
|
task "task" {
|
||||||
|
|
|
@ -8,9 +8,12 @@ job "use-efs-volume" {
|
||||||
}
|
}
|
||||||
|
|
||||||
group "group" {
|
group "group" {
|
||||||
|
|
||||||
volume "test" {
|
volume "test" {
|
||||||
type = "csi"
|
type = "csi"
|
||||||
source = "efs-vol0"
|
source = "efs-vol0"
|
||||||
|
attachment_mode = "file-system"
|
||||||
|
access_mode = "single-node-writer"
|
||||||
}
|
}
|
||||||
|
|
||||||
task "task" {
|
task "task" {
|
||||||
|
|
|
@ -15,30 +15,6 @@ resource "aws_efs_mount_target" "csi" {
|
||||||
security_groups = [aws_security_group.nfs[0].id]
|
security_groups = [aws_security_group.nfs[0].id]
|
||||||
}
|
}
|
||||||
|
|
||||||
resource "aws_ebs_volume" "csi" {
|
|
||||||
count = var.volumes ? 1 : 0
|
|
||||||
availability_zone = var.availability_zone
|
|
||||||
size = 40
|
|
||||||
|
|
||||||
tags = {
|
|
||||||
Name = "${local.random_name}-ebs"
|
|
||||||
User = data.aws_caller_identity.current.arn
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
data "template_file" "ebs_volume_hcl" {
|
|
||||||
count = var.volumes ? 1 : 0
|
|
||||||
template = <<EOT
|
|
||||||
type = "csi"
|
|
||||||
id = "ebs-vol[0]"
|
|
||||||
name = "ebs-vol"
|
|
||||||
external_id = "${aws_ebs_volume.csi[0].id}"
|
|
||||||
access_mode = "single-node-writer"
|
|
||||||
attachment_mode = "file-system"
|
|
||||||
plugin_id = "aws-ebs0"
|
|
||||||
EOT
|
|
||||||
}
|
|
||||||
|
|
||||||
data "template_file" "efs_volume_hcl" {
|
data "template_file" "efs_volume_hcl" {
|
||||||
count = var.volumes ? 1 : 0
|
count = var.volumes ? 1 : 0
|
||||||
template = <<EOT
|
template = <<EOT
|
||||||
|
@ -46,17 +22,19 @@ type = "csi"
|
||||||
id = "efs-vol0"
|
id = "efs-vol0"
|
||||||
name = "efs-vol0"
|
name = "efs-vol0"
|
||||||
external_id = "${aws_efs_file_system.csi[0].id}"
|
external_id = "${aws_efs_file_system.csi[0].id}"
|
||||||
access_mode = "single-node-writer"
|
|
||||||
attachment_mode = "file-system"
|
|
||||||
plugin_id = "aws-efs0"
|
plugin_id = "aws-efs0"
|
||||||
EOT
|
|
||||||
|
capability {
|
||||||
|
access_mode = "single-node-writer"
|
||||||
|
attachment_mode = "file-system"
|
||||||
}
|
}
|
||||||
|
|
||||||
resource "local_file" "ebs_volume_hcl" {
|
capability {
|
||||||
count = var.volumes ? 1 : 0
|
access_mode = "single-node-reader"
|
||||||
content = data.template_file.ebs_volume_hcl[0].rendered
|
attachment_mode = "file-system"
|
||||||
filename = "${path.module}/../csi/input/volume-ebs.hcl"
|
}
|
||||||
file_permission = "0664"
|
|
||||||
|
EOT
|
||||||
}
|
}
|
||||||
|
|
||||||
resource "local_file" "efs_volume_hcl" {
|
resource "local_file" "efs_volume_hcl" {
|
||||||
|
|
Loading…
Reference in a new issue