Allow per_alloc to be used with host volumes (#15780)
Disallowing per_alloc for host volumes in some cases makes life of a nomad user much harder. When we rely on the NOMAD_ALLOC_INDEX for any configuration that needs to be re-used across restarts we need to make sure allocation placement is consistent. With CSI volumes we can use the `per_alloc` feature but for some reason this is explicitly disabled for host volumes. Ensure host volumes understand the concept of per_alloc
This commit is contained in:
parent
f4d6efe69f
commit
2a5c423ae0
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:improvement
|
||||||
|
volumes: Allow `per_alloc` to be used with host_volumes
|
||||||
|
```
|
|
@ -64,6 +64,9 @@ type TaskPrestartRequest struct {
|
||||||
|
|
||||||
// TaskEnv is the task's environment
|
// TaskEnv is the task's environment
|
||||||
TaskEnv *taskenv.TaskEnv
|
TaskEnv *taskenv.TaskEnv
|
||||||
|
|
||||||
|
// Alloc is the current version of the allocation
|
||||||
|
Alloc *structs.Allocation
|
||||||
}
|
}
|
||||||
|
|
||||||
type TaskPrestartResponse struct {
|
type TaskPrestartResponse struct {
|
||||||
|
|
|
@ -208,6 +208,8 @@ func (tr *TaskRunner) prestart() error {
|
||||||
joinedCtx, joinedCancel := joincontext.Join(tr.killCtx, tr.shutdownCtx)
|
joinedCtx, joinedCancel := joincontext.Join(tr.killCtx, tr.shutdownCtx)
|
||||||
defer joinedCancel()
|
defer joinedCancel()
|
||||||
|
|
||||||
|
alloc := tr.Alloc()
|
||||||
|
|
||||||
for _, hook := range tr.runnerHooks {
|
for _, hook := range tr.runnerHooks {
|
||||||
pre, ok := hook.(interfaces.TaskPrestartHook)
|
pre, ok := hook.(interfaces.TaskPrestartHook)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -218,6 +220,7 @@ func (tr *TaskRunner) prestart() error {
|
||||||
|
|
||||||
// Build the request
|
// Build the request
|
||||||
req := interfaces.TaskPrestartRequest{
|
req := interfaces.TaskPrestartRequest{
|
||||||
|
Alloc: alloc,
|
||||||
Task: tr.Task(),
|
Task: tr.Task(),
|
||||||
TaskDir: tr.taskDir,
|
TaskDir: tr.taskDir,
|
||||||
TaskEnv: tr.envBuilder.Build(),
|
TaskEnv: tr.envBuilder.Build(),
|
||||||
|
|
|
@ -32,7 +32,7 @@ func (*volumeHook) Name() string {
|
||||||
return "volumes"
|
return "volumes"
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateHostVolumes(requestedByAlias map[string]*structs.VolumeRequest, clientVolumesByName map[string]*structs.ClientHostVolumeConfig) error {
|
func validateHostVolumes(requestedByAlias map[string]*structs.VolumeRequest, clientVolumesByName map[string]*structs.ClientHostVolumeConfig, allocName string) error {
|
||||||
var result error
|
var result error
|
||||||
|
|
||||||
for _, req := range requestedByAlias {
|
for _, req := range requestedByAlias {
|
||||||
|
@ -42,9 +42,14 @@ func validateHostVolumes(requestedByAlias map[string]*structs.VolumeRequest, cli
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
_, ok := clientVolumesByName[req.Source]
|
source := req.Source
|
||||||
|
if req.PerAlloc {
|
||||||
|
source = source + structs.AllocSuffix(allocName)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, ok := clientVolumesByName[source]
|
||||||
if !ok {
|
if !ok {
|
||||||
result = multierror.Append(result, fmt.Errorf("missing %s", req.Source))
|
result = multierror.Append(result, fmt.Errorf("missing %s", source))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,7 +59,7 @@ func validateHostVolumes(requestedByAlias map[string]*structs.VolumeRequest, cli
|
||||||
// hostVolumeMountConfigurations takes the users requested volume mounts,
|
// hostVolumeMountConfigurations takes the users requested volume mounts,
|
||||||
// volumes, and the client host volume configuration and converts them into a
|
// volumes, and the client host volume configuration and converts them into a
|
||||||
// format that can be used by drivers.
|
// format that can be used by drivers.
|
||||||
func (h *volumeHook) hostVolumeMountConfigurations(taskMounts []*structs.VolumeMount, taskVolumesByAlias map[string]*structs.VolumeRequest, clientVolumesByName map[string]*structs.ClientHostVolumeConfig) ([]*drivers.MountConfig, error) {
|
func (h *volumeHook) hostVolumeMountConfigurations(taskMounts []*structs.VolumeMount, taskVolumesByAlias map[string]*structs.VolumeRequest, clientVolumesByName map[string]*structs.ClientHostVolumeConfig, allocName string) ([]*drivers.MountConfig, error) {
|
||||||
var mounts []*drivers.MountConfig
|
var mounts []*drivers.MountConfig
|
||||||
for _, m := range taskMounts {
|
for _, m := range taskMounts {
|
||||||
req, ok := taskVolumesByAlias[m.Volume]
|
req, ok := taskVolumesByAlias[m.Volume]
|
||||||
|
@ -71,11 +76,15 @@ func (h *volumeHook) hostVolumeMountConfigurations(taskMounts []*structs.VolumeM
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
hostVolume, ok := clientVolumesByName[req.Source]
|
source := req.Source
|
||||||
|
if req.PerAlloc {
|
||||||
|
source = source + structs.AllocSuffix(allocName)
|
||||||
|
}
|
||||||
|
hostVolume, ok := clientVolumesByName[source]
|
||||||
if !ok {
|
if !ok {
|
||||||
// Should never happen, but unless the client volumes were mutated during
|
// Should never happen, but unless the client volumes were mutated during
|
||||||
// the execution of this hook.
|
// the execution of this hook.
|
||||||
return nil, fmt.Errorf("No host volume named: %s", req.Source)
|
return nil, fmt.Errorf("no host volume named: %s", source)
|
||||||
}
|
}
|
||||||
|
|
||||||
mcfg := &drivers.MountConfig{
|
mcfg := &drivers.MountConfig{
|
||||||
|
@ -110,12 +119,12 @@ func (h *volumeHook) prepareHostVolumes(req *interfaces.TaskPrestartRequest, vol
|
||||||
|
|
||||||
// Always validate volumes to ensure that we do not allow volumes to be used
|
// Always validate volumes to ensure that we do not allow volumes to be used
|
||||||
// if a host is restarted and loses the host volume configuration.
|
// if a host is restarted and loses the host volume configuration.
|
||||||
if err := validateHostVolumes(volumes, hostVolumes); err != nil {
|
if err := validateHostVolumes(volumes, hostVolumes, req.Alloc.Name); err != nil {
|
||||||
h.logger.Error("Requested Host Volume does not exist", "existing", hostVolumes, "requested", volumes)
|
h.logger.Error("Requested Host Volume does not exist", "existing", hostVolumes, "requested", volumes)
|
||||||
return nil, fmt.Errorf("host volume validation error: %v", err)
|
return nil, fmt.Errorf("host volume validation error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
hostVolumeMounts, err := h.hostVolumeMountConfigurations(req.Task.VolumeMounts, volumes, hostVolumes)
|
hostVolumeMounts, err := h.hostVolumeMountConfigurations(req.Task.VolumeMounts, volumes, hostVolumes, req.Alloc.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logger.Error("Failed to generate host volume mounts", "error", err)
|
h.logger.Error("Failed to generate host volume mounts", "error", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -30,8 +30,9 @@ func TestVolumeRequest_Validate(t *testing.T) {
|
||||||
"host volumes cannot have an access mode",
|
"host volumes cannot have an access mode",
|
||||||
"host volumes cannot have an attachment mode",
|
"host volumes cannot have an attachment mode",
|
||||||
"host volumes cannot have mount options",
|
"host volumes cannot have mount options",
|
||||||
"host volumes do not support per_alloc",
|
"volume cannot be per_alloc when canaries are in use",
|
||||||
},
|
},
|
||||||
|
canariesCount: 1,
|
||||||
req: &VolumeRequest{
|
req: &VolumeRequest{
|
||||||
Type: VolumeTypeHost,
|
Type: VolumeTypeHost,
|
||||||
ReadOnly: false,
|
ReadOnly: false,
|
||||||
|
@ -79,7 +80,6 @@ func TestVolumeRequest_Validate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
tc = tc
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
err := tc.req.Validate(tc.taskGroupCount, tc.canariesCount)
|
err := tc.req.Validate(tc.taskGroupCount, tc.canariesCount)
|
||||||
for _, expected := range tc.expected {
|
for _, expected := range tc.expected {
|
||||||
|
|
|
@ -129,8 +129,8 @@ func (v *VolumeRequest) Validate(taskGroupCount, canaries int) error {
|
||||||
if v.MountOptions != nil {
|
if v.MountOptions != nil {
|
||||||
addErr("host volumes cannot have mount options")
|
addErr("host volumes cannot have mount options")
|
||||||
}
|
}
|
||||||
if v.PerAlloc {
|
if v.PerAlloc && canaries > 0 {
|
||||||
addErr("host volumes do not support per_alloc")
|
addErr("volume cannot be per_alloc when canaries are in use")
|
||||||
}
|
}
|
||||||
|
|
||||||
case VolumeTypeCSI:
|
case VolumeTypeCSI:
|
||||||
|
|
|
@ -149,7 +149,7 @@ func NewHostVolumeChecker(ctx Context) *HostVolumeChecker {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetVolumes takes the volumes required by a task group and updates the checker.
|
// SetVolumes takes the volumes required by a task group and updates the checker.
|
||||||
func (h *HostVolumeChecker) SetVolumes(volumes map[string]*structs.VolumeRequest) {
|
func (h *HostVolumeChecker) SetVolumes(allocName string, volumes map[string]*structs.VolumeRequest) {
|
||||||
lookupMap := make(map[string][]*structs.VolumeRequest)
|
lookupMap := make(map[string][]*structs.VolumeRequest)
|
||||||
// Convert the map from map[DesiredName]Request to map[Source][]Request to improve
|
// Convert the map from map[DesiredName]Request to map[Source][]Request to improve
|
||||||
// lookup performance. Also filter non-host volumes.
|
// lookup performance. Also filter non-host volumes.
|
||||||
|
@ -158,7 +158,14 @@ func (h *HostVolumeChecker) SetVolumes(volumes map[string]*structs.VolumeRequest
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
lookupMap[req.Source] = append(lookupMap[req.Source], req)
|
if req.PerAlloc {
|
||||||
|
// provide a unique volume source per allocation
|
||||||
|
copied := req.Copy()
|
||||||
|
copied.Source = copied.Source + structs.AllocSuffix(allocName)
|
||||||
|
lookupMap[copied.Source] = append(lookupMap[copied.Source], copied)
|
||||||
|
} else {
|
||||||
|
lookupMap[req.Source] = append(lookupMap[req.Source], req)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
h.volumes = lookupMap
|
h.volumes = lookupMap
|
||||||
}
|
}
|
||||||
|
|
|
@ -103,8 +103,9 @@ func TestHostVolumeChecker(t *testing.T) {
|
||||||
}
|
}
|
||||||
nodes[1].HostVolumes = map[string]*structs.ClientHostVolumeConfig{"foo": {Name: "foo"}}
|
nodes[1].HostVolumes = map[string]*structs.ClientHostVolumeConfig{"foo": {Name: "foo"}}
|
||||||
nodes[2].HostVolumes = map[string]*structs.ClientHostVolumeConfig{
|
nodes[2].HostVolumes = map[string]*structs.ClientHostVolumeConfig{
|
||||||
"foo": {},
|
"foo": {},
|
||||||
"bar": {},
|
"bar": {},
|
||||||
|
"unique-volume[0]": {},
|
||||||
}
|
}
|
||||||
nodes[3].HostVolumes = map[string]*structs.ClientHostVolumeConfig{
|
nodes[3].HostVolumes = map[string]*structs.ClientHostVolumeConfig{
|
||||||
"foo": {},
|
"foo": {},
|
||||||
|
@ -130,6 +131,11 @@ func TestHostVolumeChecker(t *testing.T) {
|
||||||
Type: "nothost",
|
Type: "nothost",
|
||||||
Source: "baz",
|
Source: "baz",
|
||||||
},
|
},
|
||||||
|
"unique": {
|
||||||
|
Type: "host",
|
||||||
|
Source: "unique-volume[0]",
|
||||||
|
PerAlloc: true,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
checker := NewHostVolumeChecker(ctx)
|
checker := NewHostVolumeChecker(ctx)
|
||||||
|
@ -165,8 +171,11 @@ func TestHostVolumeChecker(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
alloc := mock.Alloc()
|
||||||
|
alloc.NodeID = nodes[2].ID
|
||||||
|
|
||||||
for i, c := range cases {
|
for i, c := range cases {
|
||||||
checker.SetVolumes(c.RequestedVolumes)
|
checker.SetVolumes(alloc.Name, c.RequestedVolumes)
|
||||||
if act := checker.Feasible(c.Node); act != c.Result {
|
if act := checker.Feasible(c.Node); act != c.Result {
|
||||||
t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result)
|
t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result)
|
||||||
}
|
}
|
||||||
|
@ -235,8 +244,12 @@ func TestHostVolumeChecker_ReadOnly(t *testing.T) {
|
||||||
Result: true,
|
Result: true,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
alloc := mock.Alloc()
|
||||||
|
alloc.NodeID = nodes[1].ID
|
||||||
|
|
||||||
for i, c := range cases {
|
for i, c := range cases {
|
||||||
checker.SetVolumes(c.RequestedVolumes)
|
checker.SetVolumes(alloc.Name, c.RequestedVolumes)
|
||||||
if act := checker.Feasible(c.Node); act != c.Result {
|
if act := checker.Feasible(c.Node); act != c.Result {
|
||||||
t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result)
|
t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result)
|
||||||
}
|
}
|
||||||
|
|
|
@ -144,7 +144,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra
|
||||||
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
|
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
|
||||||
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
|
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
|
||||||
s.taskGroupDevices.SetTaskGroup(tg)
|
s.taskGroupDevices.SetTaskGroup(tg)
|
||||||
s.taskGroupHostVolumes.SetVolumes(tg.Volumes)
|
s.taskGroupHostVolumes.SetVolumes(options.AllocName, tg.Volumes)
|
||||||
s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes)
|
s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes)
|
||||||
if len(tg.Networks) > 0 {
|
if len(tg.Networks) > 0 {
|
||||||
s.taskGroupNetwork.SetNetwork(tg.Networks[0])
|
s.taskGroupNetwork.SetNetwork(tg.Networks[0])
|
||||||
|
@ -321,7 +321,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran
|
||||||
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
|
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
|
||||||
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
|
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
|
||||||
s.taskGroupDevices.SetTaskGroup(tg)
|
s.taskGroupDevices.SetTaskGroup(tg)
|
||||||
s.taskGroupHostVolumes.SetVolumes(tg.Volumes)
|
s.taskGroupHostVolumes.SetVolumes(options.AllocName, tg.Volumes)
|
||||||
s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes)
|
s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes)
|
||||||
if len(tg.Networks) > 0 {
|
if len(tg.Networks) > 0 {
|
||||||
s.taskGroupNetwork.SetNetwork(tg.Networks[0])
|
s.taskGroupNetwork.SetNetwork(tg.Networks[0])
|
||||||
|
|
|
@ -105,7 +105,7 @@ a future release.
|
||||||
stopping any previous allocations. Once the operator determines the canaries
|
stopping any previous allocations. Once the operator determines the canaries
|
||||||
are healthy, they can be promoted which unblocks a rolling update of the
|
are healthy, they can be promoted which unblocks a rolling update of the
|
||||||
remaining allocations at a rate of `max_parallel`. Canary deployments cannot
|
remaining allocations at a rate of `max_parallel`. Canary deployments cannot
|
||||||
be used with CSI volumes when `per_alloc = true`.
|
be used with volumes when `per_alloc = true`.
|
||||||
|
|
||||||
- `stagger` `(string: "30s")` - Specifies the delay between each set of
|
- `stagger` `(string: "30s")` - Specifies the delay between each set of
|
||||||
[`max_parallel`](#max_parallel) updates when updating system jobs. This
|
[`max_parallel`](#max_parallel) updates when updating system jobs. This
|
||||||
|
|
|
@ -75,6 +75,13 @@ the [volume_mount][volume_mount] stanza in the `task` configuration.
|
||||||
used for validating `host_volume` ACLs and for scheduling when a
|
used for validating `host_volume` ACLs and for scheduling when a
|
||||||
matching `host_volume` requires `read_only` usage.
|
matching `host_volume` requires `read_only` usage.
|
||||||
|
|
||||||
|
- `per_alloc` `(bool: false)` - Specifies that the `source` of the volume
|
||||||
|
should have the suffix `[n]`, where `n` is the allocation index. This allows
|
||||||
|
mounting a unique volume per allocation, so long as the volume's source is
|
||||||
|
named appropriately. For example, with the source `myvolume` and `per_alloc
|
||||||
|
= true`, the allocation named `myjob.mygroup.mytask[0]` will require a
|
||||||
|
volume ID `myvolume[0]`.
|
||||||
|
|
||||||
The following fields are only valid for volumes with `type = "csi"`:
|
The following fields are only valid for volumes with `type = "csi"`:
|
||||||
|
|
||||||
- `access_mode` `(string: <required>)` - Defines whether a volume should be
|
- `access_mode` `(string: <required>)` - Defines whether a volume should be
|
||||||
|
@ -92,13 +99,6 @@ The following fields are only valid for volumes with `type = "csi"`:
|
||||||
storage providers will support `"block-device"`, which will mount the volume
|
storage providers will support `"block-device"`, which will mount the volume
|
||||||
with the CSI block device API within the container.
|
with the CSI block device API within the container.
|
||||||
|
|
||||||
- `per_alloc` `(bool: false)` - Specifies that the `source` of the volume
|
|
||||||
should have the suffix `[n]`, where `n` is the allocation index. This allows
|
|
||||||
mounting a unique volume per allocation, so long as the volume's source is
|
|
||||||
named appropriately. For example, with the source `myvolume` and `per_alloc
|
|
||||||
= true`, the allocation named `myjob.mygroup.mytask[0]` will require a
|
|
||||||
volume ID `myvolume[0]`.
|
|
||||||
|
|
||||||
- `mount_options` - Options for mounting CSI volumes that have the
|
- `mount_options` - Options for mounting CSI volumes that have the
|
||||||
`file-system` [attachment mode]. These options override the `mount_options`
|
`file-system` [attachment mode]. These options override the `mount_options`
|
||||||
field from [volume registration]. Consult the documentation for your storage
|
field from [volume registration]. Consult the documentation for your storage
|
||||||
|
|
Loading…
Reference in New Issue