da89103c5c
Split the EBS and EFS tests out into their own test cases: * EBS exercises the Controller RPCs, including the create/snapshot workflow. * EFS exercises only the Node RPCs, and assumes we have an existing volume that gets registered, rather than created.
154 lines
4.9 KiB
Go
154 lines
4.9 KiB
Go
package csi
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
|
|
e2e "github.com/hashicorp/nomad/e2e/e2eutil"
|
|
"github.com/hashicorp/nomad/e2e/framework"
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// CSINodeOnlyPluginEFSTest exercises the AWS EFS plugin, which is an
|
|
// example of a plugin that can run in Node-only mode.
|
|
type CSINodeOnlyPluginEFSTest struct {
|
|
framework.TC
|
|
uuid string
|
|
testJobIDs []string
|
|
volumeIDs []string
|
|
pluginJobIDs []string
|
|
}
|
|
|
|
const efsPluginID = "aws-efs0"
|
|
|
|
func (tc *CSINodeOnlyPluginEFSTest) BeforeAll(f *framework.F) {
|
|
t := f.T()
|
|
|
|
_, err := os.Stat("csi/input/volume-efs.hcl")
|
|
if err != nil {
|
|
t.Skip("skipping CSI test because EFS volume spec file missing:", err)
|
|
}
|
|
|
|
// Ensure cluster has leader and at least two client
|
|
// nodes in a ready state before running tests
|
|
e2e.WaitForLeader(t, tc.Nomad())
|
|
e2e.WaitForNodesReady(t, tc.Nomad(), 2)
|
|
}
|
|
|
|
// TestEFSVolumeClaim launches AWS EFS plugins and registers an EFS volume
|
|
// as a Nomad CSI volume. We then deploy a job that writes to the volume,
|
|
// and share the volume with another job which should be able to read the
|
|
// data written by the first job.
|
|
func (tc *CSINodeOnlyPluginEFSTest) TestEFSVolumeClaim(f *framework.F) {
|
|
t := f.T()
|
|
require := require.New(t)
|
|
nomadClient := tc.Nomad()
|
|
tc.uuid = uuid.Generate()[0:8]
|
|
|
|
// deploy the node plugins job (no need for a controller for EFS)
|
|
nodesJobID := "aws-efs-plugin-nodes-" + tc.uuid
|
|
f.NoError(e2e.Register(nodesJobID, "csi/input/plugin-aws-efs-nodes.nomad"))
|
|
tc.pluginJobIDs = append(tc.pluginJobIDs, nodesJobID)
|
|
|
|
f.NoError(e2e.WaitForAllocStatusComparison(
|
|
func() ([]string, error) { return e2e.AllocStatuses(nodesJobID, ns) },
|
|
func(got []string) bool {
|
|
for _, status := range got {
|
|
if status != "running" {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}, nil,
|
|
))
|
|
|
|
f.NoError(waitForPluginStatusMinNodeCount(efsPluginID, 2, pluginWait),
|
|
"aws-efs0 node plugins did not become healthy")
|
|
|
|
// register a volume
|
|
volID := "efs-vol0"
|
|
err := volumeRegister(volID, "csi/input/volume-efs.hcl", "register")
|
|
require.NoError(err)
|
|
tc.volumeIDs = append(tc.volumeIDs, volID)
|
|
|
|
// deploy a job that writes to the volume
|
|
writeJobID := "write-efs-" + tc.uuid
|
|
tc.testJobIDs = append(tc.testJobIDs, writeJobID) // ensure failed tests clean up
|
|
f.NoError(e2e.Register(writeJobID, "csi/input/use-efs-volume-write.nomad"))
|
|
f.NoError(
|
|
e2e.WaitForAllocStatusExpected(writeJobID, ns, []string{"running"}),
|
|
"job should be running")
|
|
|
|
allocs, err := e2e.AllocsForJob(writeJobID, ns)
|
|
f.NoError(err, "could not get allocs for write job")
|
|
f.Len(allocs, 1, "could not get allocs for write job")
|
|
writeAllocID := allocs[0]["ID"]
|
|
|
|
// read data from volume and assert the writer wrote a file to it
|
|
expectedPath := "/task/test/" + writeAllocID
|
|
_, err = readFile(nomadClient, writeAllocID, expectedPath)
|
|
require.NoError(err)
|
|
|
|
// Shutdown the writer so we can run a reader.
|
|
// although EFS should support multiple readers, the plugin
|
|
// does not.
|
|
_, err = e2e.Command("nomad", "job", "stop", writeJobID)
|
|
require.NoError(err)
|
|
|
|
// wait for the volume unpublish workflow to complete
|
|
require.NoError(waitForVolumeClaimRelease(volID, reapWait),
|
|
"write-efs alloc claim was not released")
|
|
|
|
// deploy a job that reads from the volume
|
|
readJobID := "read-efs-" + tc.uuid
|
|
tc.testJobIDs = append(tc.testJobIDs, readJobID) // ensure failed tests clean up
|
|
f.NoError(e2e.Register(readJobID, "csi/input/use-efs-volume-read.nomad"))
|
|
f.NoError(
|
|
e2e.WaitForAllocStatusExpected(readJobID, ns, []string{"running"}),
|
|
"job should be running")
|
|
|
|
allocs, err = e2e.AllocsForJob(readJobID, ns)
|
|
f.NoError(err, "could not get allocs for read job")
|
|
f.Len(allocs, 1, "could not get allocs for read job")
|
|
readAllocID := allocs[0]["ID"]
|
|
|
|
// read data from volume and assert the writer wrote a file to it
|
|
require.NoError(err)
|
|
_, err = readFile(nomadClient, readAllocID, expectedPath)
|
|
require.NoError(err)
|
|
}
|
|
|
|
func (tc *CSINodeOnlyPluginEFSTest) AfterEach(f *framework.F) {
|
|
|
|
// Stop all jobs in test
|
|
for _, id := range tc.testJobIDs {
|
|
out, err := e2e.Command("nomad", "job", "stop", "-purge", id)
|
|
f.Assert().NoError(err, out)
|
|
}
|
|
tc.testJobIDs = []string{}
|
|
|
|
// Deregister all volumes in test
|
|
for _, id := range tc.volumeIDs {
|
|
// make sure all the test jobs have finished unpublishing claims
|
|
err := waitForVolumeClaimRelease(id, reapWait)
|
|
f.Assert().NoError(err, "volume claims were not released")
|
|
|
|
out, err := e2e.Command("nomad", "volume", "deregister", id)
|
|
assertNoErrorElseDump(f, err,
|
|
fmt.Sprintf("could not deregister volume:\n%v", out), tc.pluginJobIDs)
|
|
}
|
|
tc.volumeIDs = []string{}
|
|
|
|
// Deregister all plugin jobs in test
|
|
for _, id := range tc.pluginJobIDs {
|
|
out, err := e2e.Command("nomad", "job", "stop", "-purge", id)
|
|
f.Assert().NoError(err, out)
|
|
}
|
|
tc.pluginJobIDs = []string{}
|
|
|
|
// Garbage collect
|
|
out, err := e2e.Command("nomad", "system", "gc")
|
|
f.Assert().NoError(err, out)
|
|
}
|