CSI: retry claims from client when max claims are reached (#12113)

When the alloc runner claims a volume, an allocation for a previous
version of the job may still have the volume claimed because it's
still shutting down. In this case we'll receive an error from the
server. Retry this error until we succeed or until a very long timeout
expires, to give operators a chance to recover broken plugins.

Make the alloc runner hook tolerant of temporary RPC failures.
This commit is contained in:
Tim Gross 2022-02-24 10:39:07 -05:00 committed by GitHub
parent cfe3117af8
commit 22cf24a6bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 203 additions and 31 deletions

7
.changelog/12113.txt Normal file
View File

@ -0,0 +1,7 @@
```release-note:bug
csi: Fixed a bug where allocations with volume claims would fail their first placement after a reschedule
```
```release-note:bug
csi: Fixed a bug where allocations with volume claims would fail to restore after a client restart
```

View File

@ -3,6 +3,7 @@ package allocrunner
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"sync" "sync"
"time" "time"
@ -28,6 +29,7 @@ type csiHook struct {
nodeSecret string nodeSecret string
volumeRequests map[string]*volumeAndRequest volumeRequests map[string]*volumeAndRequest
minBackoffInterval time.Duration
maxBackoffInterval time.Duration maxBackoffInterval time.Duration
maxBackoffDuration time.Duration maxBackoffDuration time.Duration
} }
@ -47,6 +49,7 @@ func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.M
updater: updater, updater: updater,
nodeSecret: nodeSecret, nodeSecret: nodeSecret,
volumeRequests: map[string]*volumeAndRequest{}, volumeRequests: map[string]*volumeAndRequest{},
minBackoffInterval: time.Second,
maxBackoffInterval: time.Minute, maxBackoffInterval: time.Minute,
maxBackoffDuration: time.Hour * 24, maxBackoffDuration: time.Hour * 24,
} }
@ -213,11 +216,10 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
}, },
} }
var resp structs.CSIVolumeClaimResponse resp, err := c.claimWithRetry(req)
if err := c.rpcClient.RPC("CSIVolume.Claim", req, &resp); err != nil { if err != nil {
return nil, fmt.Errorf("could not claim volume %s: %w", req.VolumeID, err) return nil, fmt.Errorf("could not claim volume %s: %w", req.VolumeID, err)
} }
if resp.Volume == nil { if resp.Volume == nil {
return nil, fmt.Errorf("Unexpected nil volume returned for ID: %v", pair.request.Source) return nil, fmt.Errorf("Unexpected nil volume returned for ID: %v", pair.request.Source)
} }
@ -230,6 +232,74 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
return result, nil return result, nil
} }
// claimWithRetry tries to claim the volume on the server, retrying
// with exponential backoff capped to a maximum interval
func (c *csiHook) claimWithRetry(req *structs.CSIVolumeClaimRequest) (*structs.CSIVolumeClaimResponse, error) {
// note: allocrunner hooks don't have access to the client's
// shutdown context, just the allocrunner's shutdown; if we make
// it available in the future we should thread it through here so
// that retry can exit gracefully instead of dropping the
// in-flight goroutine
ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration)
defer cancel()
var resp structs.CSIVolumeClaimResponse
var err error
backoff := c.minBackoffInterval
t, stop := helper.NewSafeTimer(0)
defer stop()
for {
select {
case <-ctx.Done():
return nil, err
case <-t.C:
}
err = c.rpcClient.RPC("CSIVolume.Claim", req, &resp)
if err == nil {
break
}
if !isRetryableClaimRPCError(err) {
break
}
if backoff < c.maxBackoffInterval {
backoff = backoff * 2
if backoff > c.maxBackoffInterval {
backoff = c.maxBackoffInterval
}
}
c.logger.Debug(
"volume could not be claimed because it is in use, retrying in %v", backoff)
t.Reset(backoff)
}
return &resp, err
}
// isRetryableClaimRPCError looks for errors where we need to retry
// with backoff because we expect them to be eventually resolved.
func isRetryableClaimRPCError(err error) bool {
// note: because these errors are returned via RPC which breaks error
// wrapping, we can't check with errors.Is and need to read the string
errMsg := err.Error()
if strings.Contains(errMsg, structs.ErrCSIVolumeMaxClaims.Error()) {
return true
}
if strings.Contains(errMsg, structs.ErrCSIClientRPCRetryable.Error()) {
return true
}
if strings.Contains(errMsg, "no servers") {
return true
}
if strings.Contains(errMsg, structs.ErrNoLeader.Error()) {
return true
}
return false
}
func (c *csiHook) shouldRun() bool { func (c *csiHook) shouldRun() bool {
tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup) tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup)
for _, vol := range tg.Volumes { for _, vol := range tg.Volumes {
@ -286,7 +356,7 @@ func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error {
ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration) ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration)
defer cancel() defer cancel()
var err error var err error
backoff := time.Second backoff := c.minBackoffInterval
t, stop := helper.NewSafeTimer(0) t, stop := helper.NewSafeTimer(0)
defer stop() defer stop()
for { for {
@ -307,6 +377,8 @@ func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error {
backoff = c.maxBackoffInterval backoff = c.maxBackoffInterval
} }
} }
c.logger.Debug(
"volume could not be unmounted, retrying in %v", backoff)
t.Reset(backoff) t.Reset(backoff)
} }
return nil return nil

View File

@ -7,12 +7,14 @@ import (
"testing" "testing"
"time" "time"
"github.com/pkg/errors"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/pluginmanager" "github.com/hashicorp/nomad/client/pluginmanager"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager" "github.com/hashicorp/nomad/client/pluginmanager/csimanager"
cstructs "github.com/hashicorp/nomad/client/structs" cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
@ -34,6 +36,9 @@ func TestCSIHook(t *testing.T) {
testcases := []struct { testcases := []struct {
name string name string
volumeRequests map[string]*structs.VolumeRequest volumeRequests map[string]*structs.VolumeRequest
startsUnschedulable bool
startsWithClaims bool
expectedClaimErr error
expectedMounts map[string]*csimanager.MountInfo expectedMounts map[string]*csimanager.MountInfo
expectedMountCalls int expectedMountCalls int
expectedUnmountCalls int expectedUnmountCalls int
@ -89,6 +94,58 @@ func TestCSIHook(t *testing.T) {
expectedUnpublishCalls: 1, expectedUnpublishCalls: 1,
}, },
{
name: "fatal error on claim",
volumeRequests: map[string]*structs.VolumeRequest{
"vol0": {
Name: "vol0",
Type: structs.VolumeTypeCSI,
Source: "testvolume0",
ReadOnly: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
MountOptions: &structs.CSIMountOptions{},
PerAlloc: false,
},
},
startsUnschedulable: true,
expectedMounts: map[string]*csimanager.MountInfo{
"vol0": &csimanager.MountInfo{Source: fmt.Sprintf(
"test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)},
},
expectedMountCalls: 0,
expectedUnmountCalls: 0,
expectedClaimCalls: 1,
expectedUnpublishCalls: 0,
expectedClaimErr: errors.New(
"claim volumes: could not claim volume testvolume0: volume is currently unschedulable"),
},
{
name: "retryable error on claim",
volumeRequests: map[string]*structs.VolumeRequest{
"vol0": {
Name: "vol0",
Type: structs.VolumeTypeCSI,
Source: "testvolume0",
ReadOnly: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
MountOptions: &structs.CSIMountOptions{},
PerAlloc: false,
},
},
startsWithClaims: true,
expectedMounts: map[string]*csimanager.MountInfo{
"vol0": &csimanager.MountInfo{Source: fmt.Sprintf(
"test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)},
},
expectedMountCalls: 1,
expectedUnmountCalls: 1,
expectedClaimCalls: 2,
expectedUnpublishCalls: 1,
},
// TODO: this won't actually work on the client. // TODO: this won't actually work on the client.
// https://github.com/hashicorp/nomad/issues/11798 // https://github.com/hashicorp/nomad/issues/11798
// //
@ -136,7 +193,12 @@ func TestCSIHook(t *testing.T) {
callCounts := map[string]int{} callCounts := map[string]int{}
mgr := mockPluginManager{mounter: mockVolumeMounter{callCounts: callCounts}} mgr := mockPluginManager{mounter: mockVolumeMounter{callCounts: callCounts}}
rpcer := mockRPCer{alloc: alloc, callCounts: callCounts} rpcer := mockRPCer{
alloc: alloc,
callCounts: callCounts,
hasExistingClaim: helper.BoolToPtr(tc.startsWithClaims),
schedulable: helper.BoolToPtr(!tc.startsUnschedulable),
}
ar := mockAllocRunner{ ar := mockAllocRunner{
res: &cstructs.AllocHookResources{}, res: &cstructs.AllocHookResources{},
caps: &drivers.Capabilities{ caps: &drivers.Capabilities{
@ -145,17 +207,24 @@ func TestCSIHook(t *testing.T) {
}, },
} }
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret") hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret")
hook.maxBackoffInterval = 100 * time.Millisecond hook.minBackoffInterval = 1 * time.Millisecond
hook.maxBackoffDuration = 2 * time.Second hook.maxBackoffInterval = 10 * time.Millisecond
hook.maxBackoffDuration = 500 * time.Millisecond
require.NotNil(t, hook) require.NotNil(t, hook)
if tc.expectedClaimErr != nil {
require.EqualError(t, hook.Prerun(), tc.expectedClaimErr.Error())
mounts := ar.GetAllocHookResources().GetCSIMounts()
require.Nil(t, mounts)
} else {
require.NoError(t, hook.Prerun()) require.NoError(t, hook.Prerun())
mounts := ar.GetAllocHookResources().GetCSIMounts() mounts := ar.GetAllocHookResources().GetCSIMounts()
require.NotNil(t, mounts) require.NotNil(t, mounts)
require.Equal(t, tc.expectedMounts, mounts) require.Equal(t, tc.expectedMounts, mounts)
require.NoError(t, hook.Postrun()) require.NoError(t, hook.Postrun())
}
require.Equal(t, tc.expectedMountCalls, callCounts["mount"]) require.Equal(t, tc.expectedMountCalls, callCounts["mount"])
require.Equal(t, tc.expectedUnmountCalls, callCounts["unmount"]) require.Equal(t, tc.expectedUnmountCalls, callCounts["unmount"])
require.Equal(t, tc.expectedClaimCalls, callCounts["claim"]) require.Equal(t, tc.expectedClaimCalls, callCounts["claim"])
@ -168,25 +237,11 @@ func TestCSIHook(t *testing.T) {
// HELPERS AND MOCKS // HELPERS AND MOCKS
func testVolume(id string) *structs.CSIVolume {
vol := structs.NewCSIVolume(id, 0)
vol.Schedulable = true
vol.RequestedCapabilities = []*structs.CSIVolumeCapability{
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
},
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
},
}
return vol
}
type mockRPCer struct { type mockRPCer struct {
alloc *structs.Allocation alloc *structs.Allocation
callCounts map[string]int callCounts map[string]int
hasExistingClaim *bool
schedulable *bool
} }
// RPC mocks the server RPCs, acting as though any request succeeds // RPC mocks the server RPCs, acting as though any request succeeds
@ -195,7 +250,7 @@ func (r mockRPCer) RPC(method string, args interface{}, reply interface{}) error
case "CSIVolume.Claim": case "CSIVolume.Claim":
r.callCounts["claim"]++ r.callCounts["claim"]++
req := args.(*structs.CSIVolumeClaimRequest) req := args.(*structs.CSIVolumeClaimRequest)
vol := testVolume(req.VolumeID) vol := r.testVolume(req.VolumeID)
err := vol.Claim(req.ToClaim(), r.alloc) err := vol.Claim(req.ToClaim(), r.alloc)
if err != nil { if err != nil {
return err return err
@ -215,6 +270,44 @@ func (r mockRPCer) RPC(method string, args interface{}, reply interface{}) error
return nil return nil
} }
// testVolume is a helper that optionally starts as unschedulable /
// claimed until after the first claim RPC is made, so that we can
// test retryable vs non-retryable failures
func (r mockRPCer) testVolume(id string) *structs.CSIVolume {
vol := structs.NewCSIVolume(id, 0)
vol.Schedulable = *r.schedulable
vol.RequestedCapabilities = []*structs.CSIVolumeCapability{
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
},
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
},
}
if *r.hasExistingClaim {
vol.AccessMode = structs.CSIVolumeAccessModeSingleNodeReader
vol.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
vol.ReadClaims["another-alloc-id"] = &structs.CSIVolumeClaim{
AllocationID: "another-alloc-id",
NodeID: "another-node-id",
Mode: structs.CSIVolumeClaimRead,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
State: structs.CSIVolumeClaimStateTaken,
}
}
if r.callCounts["claim"] >= 0 {
*r.hasExistingClaim = false
*r.schedulable = true
}
return vol
}
type mockVolumeMounter struct { type mockVolumeMounter struct {
callCounts map[string]int callCounts map[string]int
} }