From aadc9e3017b83916b8bba695028bb71dc70df1e1 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 20 Oct 2016 13:55:35 -0700 Subject: [PATCH] Add implicit signal constraint and validate that a driver can handle the signal. Also fixes a bug with plan and implicit constraints by adding them to the job being planned --- nomad/job_endpoint.go | 120 ++++++++++++++++++++++----- nomad/job_endpoint_test.go | 148 ++++++++++++++++++++++++++++++++++ nomad/structs/funcs.go | 16 ++++ nomad/structs/funcs_test.go | 15 ++++ nomad/structs/structs.go | 50 ++++++++++++ nomad/structs/structs_test.go | 87 ++++++++++++++++++++ 6 files changed, 414 insertions(+), 22 deletions(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index fd27eb606..ff70661bd 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -52,6 +52,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis // Initialize the job fields (sets defaults and any necessary init work). args.Job.Canonicalize() + // Add implicit constraints + setImplicitConstraints(args.Job) + // Validate the job. if err := validateJob(args.Job); err != nil { return err @@ -115,28 +118,6 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis } } } - - // Add implicit constraints that the task groups are run on a Node with - // Vault - for _, tg := range args.Job.TaskGroups { - _, ok := policies[tg.Name] - if !ok { - // Not requesting Vault - continue - } - - found := false - for _, c := range tg.Constraints { - if c.Equal(vaultConstraint) { - found = true - break - } - } - - if !found { - tg.Constraints = append(tg.Constraints, vaultConstraint) - } - } } // Clear the Vault token @@ -188,6 +169,77 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return nil } +// setImplicitConstraints adds implicit constraints to the job based on the +// features it is requesting. +func setImplicitConstraints(j *structs.Job) { + // Get the required Vault Policies + policies := j.VaultPolicies() + + // Get the required signals + signals := j.RequiredSignals() + + // Hot path + if len(signals) == 0 && len(policies) == 0 { + return + } + + // Add Vault constraints + for _, tg := range j.TaskGroups { + _, ok := policies[tg.Name] + if !ok { + // Not requesting Vault + continue + } + + found := false + for _, c := range tg.Constraints { + if c.Equal(vaultConstraint) { + found = true + break + } + } + + if !found { + tg.Constraints = append(tg.Constraints, vaultConstraint) + } + } + + // Add signal constraints + for _, tg := range j.TaskGroups { + tgSignals, ok := signals[tg.Name] + if !ok { + // Not requesting Vault + continue + } + + // Flatten the signals + required := structs.MapStringStringSliceValueSet(tgSignals) + sigConstraint := getSignalConstraint(required) + + found := false + for _, c := range tg.Constraints { + if c.Equal(sigConstraint) { + found = true + break + } + } + + if !found { + tg.Constraints = append(tg.Constraints, sigConstraint) + } + } +} + +// getSignalConstraint builds a suitable constraint based on the required +// signals +func getSignalConstraint(signals []string) *structs.Constraint { + return &structs.Constraint{ + Operand: structs.ConstraintSetContains, + LTarget: "${attr.os.signals}", + RTarget: strings.Join(signals, ","), + } +} + // Summary retreives the summary of a job func (j *Job) Summary(args *structs.JobSummaryRequest, reply *structs.JobSummaryResponse) error { @@ -556,6 +608,9 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse) // Initialize the job fields (sets defaults and any necessary init work). args.Job.Canonicalize() + // Add implicit constraints + setImplicitConstraints(args.Job) + // Validate the job. if err := validateJob(args.Job); err != nil { return err @@ -656,8 +711,14 @@ func validateJob(job *structs.Job) error { multierror.Append(validationErrors, err) } + // Get the signals required + signals := job.RequiredSignals() + // Validate the driver configurations. for _, tg := range job.TaskGroups { + // Get the signals for the task group + tgSignals, tgOk := signals[tg.Name] + for _, task := range tg.Tasks { d, err := driver.NewDriver( task.Driver, @@ -673,6 +734,21 @@ func validateJob(job *structs.Job) error { formatted := fmt.Errorf("group %q -> task %q -> config: %v", tg.Name, task.Name, err) multierror.Append(validationErrors, formatted) } + + // The task group didn't have any task that required signals + if !tgOk { + continue + } + + // This task requires signals. Ensure the driver is capable + if required, ok := tgSignals[task.Name]; ok { + abilities := d.Abilities() + if !abilities.SendSignals { + formatted := fmt.Errorf("group %q -> task %q: driver %q doesn't support sending signals. Requested signals are %v", + tg.Name, task.Name, task.Driver, strings.Join(required, ", ")) + multierror.Append(validationErrors, formatted) + } + } } } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 856283a7c..ad6799fd6 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -1524,3 +1524,151 @@ func TestJobEndpoint_Plan_NoDiff(t *testing.T) { t.Fatalf("no failed task group alloc metrics") } } + +func TestJobEndpoint_ImplicitConstraints_Vault(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Enable vault + tr, f := true, false + s1.config.VaultConfig.Enabled = &tr + s1.config.VaultConfig.AllowUnauthenticated = &f + + // Replace the Vault Client on the server + tvc := &TestVaultClient{} + s1.vault = tvc + + policy := "foo" + goodToken := structs.GenerateUUID() + goodPolicies := []string{"foo", "bar", "baz"} + tvc.SetLookupTokenAllowedPolicies(goodToken, goodPolicies) + + // Create the register request with a job asking for a vault policy + job := mock.Job() + job.VaultToken = goodToken + job.TaskGroups[0].Tasks[0].Vault = &structs.Vault{ + Policies: []string{policy}, + ChangeMode: structs.VaultChangeModeRestart, + } + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.JobRegisterResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil { + t.Fatalf("bad: %v", err) + } + + // Check for the job in the FSM + state := s1.fsm.State() + out, err := state.JobByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("expected job") + } + if out.CreateIndex != resp.JobModifyIndex { + t.Fatalf("index mis-match") + } + + // Check that there is an implicit vault constraint + constraints := out.TaskGroups[0].Constraints + if len(constraints) != 1 { + t.Fatalf("Expected an implicit constraint") + } + + if !constraints[0].Equal(vaultConstraint) { + t.Fatalf("Expected implicit vault constraint") + } +} + +func TestJobEndpoint_ImplicitConstraints_Signals(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request with a job asking for a template that sends a + // signal + job := mock.Job() + signal := "SIGUSR1" + job.TaskGroups[0].Tasks[0].Templates = []*structs.Template{ + &structs.Template{ + SourcePath: "foo", + DestPath: "bar", + ChangeMode: structs.TemplateChangeModeSignal, + ChangeSignal: signal, + }, + } + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.JobRegisterResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil { + t.Fatalf("bad: %v", err) + } + + // Check for the job in the FSM + state := s1.fsm.State() + out, err := state.JobByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("expected job") + } + if out.CreateIndex != resp.JobModifyIndex { + t.Fatalf("index mis-match") + } + + // Check that there is an implicit signal constraint + constraints := out.TaskGroups[0].Constraints + if len(constraints) != 1 { + t.Fatalf("Expected an implicit constraint") + } + + sigConstraint := getSignalConstraint([]string{signal}) + + if !constraints[0].Equal(sigConstraint) { + t.Fatalf("Expected implicit vault constraint") + } +} + +func TestJobEndpoint_ValidateJob_InvalidDriverConf(t *testing.T) { + // Create a mock job with an invalid config + job := mock.Job() + job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "foo": "bar", + } + + if err := validateJob(job); err == nil || !strings.Contains(err.Error(), "-> config") { + t.Fatalf("Expected config error; got %v", err) + } +} + +func TestJobEndpoint_ValidateJob_InvalidSignals(t *testing.T) { + // Create a mock job that wants to send a signal to a driver that can't + job := mock.Job() + job.TaskGroups[0].Tasks[0].Driver = "qemu" + job.TaskGroups[0].Tasks[0].Vault = &structs.Vault{ + Policies: []string{"foo"}, + ChangeMode: structs.VaultChangeModeSignal, + ChangeSignal: "SIGUSR1", + } + + if err := validateJob(job); err == nil || !strings.Contains(err.Error(), "support sending signals") { + t.Fatalf("Expected signal feasibility error; got %v", err) + } +} diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index c4aaef7f5..104bb58b4 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -288,3 +288,19 @@ func VaultPoliciesSet(policies map[string]map[string]*Vault) []string { } return flattened } + +// MapStringStringSliceValueSet returns the set of values in a map[string][]string +func MapStringStringSliceValueSet(m map[string][]string) []string { + set := make(map[string]struct{}) + for _, slice := range m { + for _, v := range slice { + set[v] = struct{}{} + } + } + + flat := make([]string, 0, len(set)) + for k := range set { + flat = append(flat, k) + } + return flat +} diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index a7f54937d..059314a72 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -2,6 +2,7 @@ package structs import ( "fmt" + "reflect" "regexp" "testing" ) @@ -284,3 +285,17 @@ func TestSliceStringIsSubset(t *testing.T) { t.Fatalf("bad %v %v", sub, offending) } } + +func TestMapStringStringSliceValueSet(t *testing.T) { + m := map[string][]string{ + "foo": []string{"1", "2"}, + "bar": []string{"3"}, + "baz": nil, + } + + act := MapStringStringSliceValueSet(m) + exp := []string{"1", "2", "3"} + if !reflect.DeepEqual(act, exp) { + t.Fatalf("Bad") + } +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 7459b88fd..a034c6057 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -14,6 +14,7 @@ import ( "path/filepath" "reflect" "regexp" + "sort" "strconv" "strings" "time" @@ -1300,6 +1301,55 @@ func (j *Job) VaultPolicies() map[string]map[string]*Vault { return policies } +// RequiredSignals returns a mapping of task groups to tasks to their required +// set of signals +func (j *Job) RequiredSignals() map[string]map[string][]string { + signals := make(map[string]map[string][]string) + + for _, tg := range j.TaskGroups { + for _, task := range tg.Tasks { + // Use this local one as a set + taskSignals := make(map[string]struct{}) + + // Check if the Vault change mode uses signals + if task.Vault != nil && task.Vault.ChangeMode == VaultChangeModeSignal { + taskSignals[task.Vault.ChangeSignal] = struct{}{} + } + + // Check if any template change mode uses signals + for _, t := range task.Templates { + if t.ChangeMode != TemplateChangeModeSignal { + continue + } + + taskSignals[t.ChangeSignal] = struct{}{} + } + + // Flatten and sort the signals + l := len(taskSignals) + if l == 0 { + continue + } + + flat := make([]string, 0, l) + for sig := range taskSignals { + flat = append(flat, sig) + } + + sort.Strings(flat) + tgSignals, ok := signals[tg.Name] + if !ok { + tgSignals = make(map[string][]string) + signals[tg.Name] = tgSignals + } + tgSignals[task.Name] = flat + } + + } + + return signals +} + // JobListStub is used to return a subset of job information // for the job list type JobListStub struct { diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 7ce4cfc86..58519969e 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -305,6 +305,93 @@ func TestJob_VaultPolicies(t *testing.T) { } } +func TestJob_RequiredSignals(t *testing.T) { + j0 := &Job{} + e0 := make(map[string]map[string][]string, 0) + + vj1 := &Vault{ + Policies: []string{"p1"}, + ChangeMode: VaultChangeModeNoop, + } + vj2 := &Vault{ + Policies: []string{"p1"}, + ChangeMode: VaultChangeModeSignal, + ChangeSignal: "SIGUSR1", + } + tj1 := &Template{ + SourcePath: "foo", + DestPath: "bar", + ChangeMode: TemplateChangeModeNoop, + } + tj2 := &Template{ + SourcePath: "foo", + DestPath: "bar", + ChangeMode: TemplateChangeModeSignal, + ChangeSignal: "SIGUSR2", + } + j1 := &Job{ + TaskGroups: []*TaskGroup{ + &TaskGroup{ + Name: "foo", + Tasks: []*Task{ + &Task{ + Name: "t1", + }, + &Task{ + Name: "t2", + Vault: vj2, + Templates: []*Template{tj2}, + }, + }, + }, + &TaskGroup{ + Name: "bar", + Tasks: []*Task{ + &Task{ + Name: "t3", + Vault: vj1, + Templates: []*Template{tj1}, + }, + &Task{ + Name: "t4", + Vault: vj2, + }, + }, + }, + }, + } + + e1 := map[string]map[string][]string{ + "foo": map[string][]string{ + "t2": []string{"SIGUSR1", "SIGUSR2"}, + }, + "bar": map[string][]string{ + "t4": []string{"SIGUSR1"}, + }, + } + + cases := []struct { + Job *Job + Expected map[string]map[string][]string + }{ + { + Job: j0, + Expected: e0, + }, + { + Job: j1, + Expected: e1, + }, + } + + for i, c := range cases { + got := c.Job.RequiredSignals() + if !reflect.DeepEqual(got, c.Expected) { + t.Fatalf("case %d: got %#v; want %#v", i+1, got, c.Expected) + } + } +} + func TestTaskGroup_Validate(t *testing.T) { tg := &TaskGroup{ Count: -1,