Add implicit signal constraint and validate that a driver can handle the signal. Also fixes a bug with plan and implicit constraints by adding them to the job being planned

This commit is contained in:
Alex Dadgar 2016-10-20 13:55:35 -07:00
parent 41b5679015
commit aadc9e3017
6 changed files with 414 additions and 22 deletions

View File

@ -52,6 +52,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
// Initialize the job fields (sets defaults and any necessary init work).
args.Job.Canonicalize()
// Add implicit constraints
setImplicitConstraints(args.Job)
// Validate the job.
if err := validateJob(args.Job); err != nil {
return err
@ -115,28 +118,6 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
}
}
}
// Add implicit constraints that the task groups are run on a Node with
// Vault
for _, tg := range args.Job.TaskGroups {
_, ok := policies[tg.Name]
if !ok {
// Not requesting Vault
continue
}
found := false
for _, c := range tg.Constraints {
if c.Equal(vaultConstraint) {
found = true
break
}
}
if !found {
tg.Constraints = append(tg.Constraints, vaultConstraint)
}
}
}
// Clear the Vault token
@ -188,6 +169,77 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return nil
}
// setImplicitConstraints adds implicit constraints to the job based on the
// features it is requesting.
func setImplicitConstraints(j *structs.Job) {
// Get the required Vault Policies
policies := j.VaultPolicies()
// Get the required signals
signals := j.RequiredSignals()
// Hot path
if len(signals) == 0 && len(policies) == 0 {
return
}
// Add Vault constraints
for _, tg := range j.TaskGroups {
_, ok := policies[tg.Name]
if !ok {
// Not requesting Vault
continue
}
found := false
for _, c := range tg.Constraints {
if c.Equal(vaultConstraint) {
found = true
break
}
}
if !found {
tg.Constraints = append(tg.Constraints, vaultConstraint)
}
}
// Add signal constraints
for _, tg := range j.TaskGroups {
tgSignals, ok := signals[tg.Name]
if !ok {
// Not requesting Vault
continue
}
// Flatten the signals
required := structs.MapStringStringSliceValueSet(tgSignals)
sigConstraint := getSignalConstraint(required)
found := false
for _, c := range tg.Constraints {
if c.Equal(sigConstraint) {
found = true
break
}
}
if !found {
tg.Constraints = append(tg.Constraints, sigConstraint)
}
}
}
// getSignalConstraint builds a suitable constraint based on the required
// signals
func getSignalConstraint(signals []string) *structs.Constraint {
return &structs.Constraint{
Operand: structs.ConstraintSetContains,
LTarget: "${attr.os.signals}",
RTarget: strings.Join(signals, ","),
}
}
// Summary retreives the summary of a job
func (j *Job) Summary(args *structs.JobSummaryRequest,
reply *structs.JobSummaryResponse) error {
@ -556,6 +608,9 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)
// Initialize the job fields (sets defaults and any necessary init work).
args.Job.Canonicalize()
// Add implicit constraints
setImplicitConstraints(args.Job)
// Validate the job.
if err := validateJob(args.Job); err != nil {
return err
@ -656,8 +711,14 @@ func validateJob(job *structs.Job) error {
multierror.Append(validationErrors, err)
}
// Get the signals required
signals := job.RequiredSignals()
// Validate the driver configurations.
for _, tg := range job.TaskGroups {
// Get the signals for the task group
tgSignals, tgOk := signals[tg.Name]
for _, task := range tg.Tasks {
d, err := driver.NewDriver(
task.Driver,
@ -673,6 +734,21 @@ func validateJob(job *structs.Job) error {
formatted := fmt.Errorf("group %q -> task %q -> config: %v", tg.Name, task.Name, err)
multierror.Append(validationErrors, formatted)
}
// The task group didn't have any task that required signals
if !tgOk {
continue
}
// This task requires signals. Ensure the driver is capable
if required, ok := tgSignals[task.Name]; ok {
abilities := d.Abilities()
if !abilities.SendSignals {
formatted := fmt.Errorf("group %q -> task %q: driver %q doesn't support sending signals. Requested signals are %v",
tg.Name, task.Name, task.Driver, strings.Join(required, ", "))
multierror.Append(validationErrors, formatted)
}
}
}
}

View File

@ -1524,3 +1524,151 @@ func TestJobEndpoint_Plan_NoDiff(t *testing.T) {
t.Fatalf("no failed task group alloc metrics")
}
}
func TestJobEndpoint_ImplicitConstraints_Vault(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Enable vault
tr, f := true, false
s1.config.VaultConfig.Enabled = &tr
s1.config.VaultConfig.AllowUnauthenticated = &f
// Replace the Vault Client on the server
tvc := &TestVaultClient{}
s1.vault = tvc
policy := "foo"
goodToken := structs.GenerateUUID()
goodPolicies := []string{"foo", "bar", "baz"}
tvc.SetLookupTokenAllowedPolicies(goodToken, goodPolicies)
// Create the register request with a job asking for a vault policy
job := mock.Job()
job.VaultToken = goodToken
job.TaskGroups[0].Tasks[0].Vault = &structs.Vault{
Policies: []string{policy},
ChangeMode: structs.VaultChangeModeRestart,
}
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("bad: %v", err)
}
// Check for the job in the FSM
state := s1.fsm.State()
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected job")
}
if out.CreateIndex != resp.JobModifyIndex {
t.Fatalf("index mis-match")
}
// Check that there is an implicit vault constraint
constraints := out.TaskGroups[0].Constraints
if len(constraints) != 1 {
t.Fatalf("Expected an implicit constraint")
}
if !constraints[0].Equal(vaultConstraint) {
t.Fatalf("Expected implicit vault constraint")
}
}
func TestJobEndpoint_ImplicitConstraints_Signals(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request with a job asking for a template that sends a
// signal
job := mock.Job()
signal := "SIGUSR1"
job.TaskGroups[0].Tasks[0].Templates = []*structs.Template{
&structs.Template{
SourcePath: "foo",
DestPath: "bar",
ChangeMode: structs.TemplateChangeModeSignal,
ChangeSignal: signal,
},
}
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("bad: %v", err)
}
// Check for the job in the FSM
state := s1.fsm.State()
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected job")
}
if out.CreateIndex != resp.JobModifyIndex {
t.Fatalf("index mis-match")
}
// Check that there is an implicit signal constraint
constraints := out.TaskGroups[0].Constraints
if len(constraints) != 1 {
t.Fatalf("Expected an implicit constraint")
}
sigConstraint := getSignalConstraint([]string{signal})
if !constraints[0].Equal(sigConstraint) {
t.Fatalf("Expected implicit vault constraint")
}
}
func TestJobEndpoint_ValidateJob_InvalidDriverConf(t *testing.T) {
// Create a mock job with an invalid config
job := mock.Job()
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"foo": "bar",
}
if err := validateJob(job); err == nil || !strings.Contains(err.Error(), "-> config") {
t.Fatalf("Expected config error; got %v", err)
}
}
func TestJobEndpoint_ValidateJob_InvalidSignals(t *testing.T) {
// Create a mock job that wants to send a signal to a driver that can't
job := mock.Job()
job.TaskGroups[0].Tasks[0].Driver = "qemu"
job.TaskGroups[0].Tasks[0].Vault = &structs.Vault{
Policies: []string{"foo"},
ChangeMode: structs.VaultChangeModeSignal,
ChangeSignal: "SIGUSR1",
}
if err := validateJob(job); err == nil || !strings.Contains(err.Error(), "support sending signals") {
t.Fatalf("Expected signal feasibility error; got %v", err)
}
}

View File

@ -288,3 +288,19 @@ func VaultPoliciesSet(policies map[string]map[string]*Vault) []string {
}
return flattened
}
// MapStringStringSliceValueSet returns the set of values in a map[string][]string
func MapStringStringSliceValueSet(m map[string][]string) []string {
set := make(map[string]struct{})
for _, slice := range m {
for _, v := range slice {
set[v] = struct{}{}
}
}
flat := make([]string, 0, len(set))
for k := range set {
flat = append(flat, k)
}
return flat
}

View File

@ -2,6 +2,7 @@ package structs
import (
"fmt"
"reflect"
"regexp"
"testing"
)
@ -284,3 +285,17 @@ func TestSliceStringIsSubset(t *testing.T) {
t.Fatalf("bad %v %v", sub, offending)
}
}
func TestMapStringStringSliceValueSet(t *testing.T) {
m := map[string][]string{
"foo": []string{"1", "2"},
"bar": []string{"3"},
"baz": nil,
}
act := MapStringStringSliceValueSet(m)
exp := []string{"1", "2", "3"}
if !reflect.DeepEqual(act, exp) {
t.Fatalf("Bad")
}
}

View File

@ -14,6 +14,7 @@ import (
"path/filepath"
"reflect"
"regexp"
"sort"
"strconv"
"strings"
"time"
@ -1300,6 +1301,55 @@ func (j *Job) VaultPolicies() map[string]map[string]*Vault {
return policies
}
// RequiredSignals returns a mapping of task groups to tasks to their required
// set of signals
func (j *Job) RequiredSignals() map[string]map[string][]string {
signals := make(map[string]map[string][]string)
for _, tg := range j.TaskGroups {
for _, task := range tg.Tasks {
// Use this local one as a set
taskSignals := make(map[string]struct{})
// Check if the Vault change mode uses signals
if task.Vault != nil && task.Vault.ChangeMode == VaultChangeModeSignal {
taskSignals[task.Vault.ChangeSignal] = struct{}{}
}
// Check if any template change mode uses signals
for _, t := range task.Templates {
if t.ChangeMode != TemplateChangeModeSignal {
continue
}
taskSignals[t.ChangeSignal] = struct{}{}
}
// Flatten and sort the signals
l := len(taskSignals)
if l == 0 {
continue
}
flat := make([]string, 0, l)
for sig := range taskSignals {
flat = append(flat, sig)
}
sort.Strings(flat)
tgSignals, ok := signals[tg.Name]
if !ok {
tgSignals = make(map[string][]string)
signals[tg.Name] = tgSignals
}
tgSignals[task.Name] = flat
}
}
return signals
}
// JobListStub is used to return a subset of job information
// for the job list
type JobListStub struct {

View File

@ -305,6 +305,93 @@ func TestJob_VaultPolicies(t *testing.T) {
}
}
func TestJob_RequiredSignals(t *testing.T) {
j0 := &Job{}
e0 := make(map[string]map[string][]string, 0)
vj1 := &Vault{
Policies: []string{"p1"},
ChangeMode: VaultChangeModeNoop,
}
vj2 := &Vault{
Policies: []string{"p1"},
ChangeMode: VaultChangeModeSignal,
ChangeSignal: "SIGUSR1",
}
tj1 := &Template{
SourcePath: "foo",
DestPath: "bar",
ChangeMode: TemplateChangeModeNoop,
}
tj2 := &Template{
SourcePath: "foo",
DestPath: "bar",
ChangeMode: TemplateChangeModeSignal,
ChangeSignal: "SIGUSR2",
}
j1 := &Job{
TaskGroups: []*TaskGroup{
&TaskGroup{
Name: "foo",
Tasks: []*Task{
&Task{
Name: "t1",
},
&Task{
Name: "t2",
Vault: vj2,
Templates: []*Template{tj2},
},
},
},
&TaskGroup{
Name: "bar",
Tasks: []*Task{
&Task{
Name: "t3",
Vault: vj1,
Templates: []*Template{tj1},
},
&Task{
Name: "t4",
Vault: vj2,
},
},
},
},
}
e1 := map[string]map[string][]string{
"foo": map[string][]string{
"t2": []string{"SIGUSR1", "SIGUSR2"},
},
"bar": map[string][]string{
"t4": []string{"SIGUSR1"},
},
}
cases := []struct {
Job *Job
Expected map[string]map[string][]string
}{
{
Job: j0,
Expected: e0,
},
{
Job: j1,
Expected: e1,
},
}
for i, c := range cases {
got := c.Job.RequiredSignals()
if !reflect.DeepEqual(got, c.Expected) {
t.Fatalf("case %d: got %#v; want %#v", i+1, got, c.Expected)
}
}
}
func TestTaskGroup_Validate(t *testing.T) {
tg := &TaskGroup{
Count: -1,