Merge pull request #2654 from hashicorp/f-env-consul

Add envconsul-like support and refactor environment handling
This commit is contained in:
Michael Schurter 2017-05-30 14:40:14 -07:00 committed by GitHub
commit dd51aa1cb9
35 changed files with 1085 additions and 814 deletions

View file

@ -268,6 +268,11 @@ func TestJobs_Canonicalize(t *testing.T) {
EmbeddedTmpl: helper.StringToPtr("---"),
DestPath: helper.StringToPtr("local/file.yml"),
},
{
EmbeddedTmpl: helper.StringToPtr("FOO=bar\n"),
DestPath: helper.StringToPtr("local/file.env"),
Envvars: helper.BoolToPtr(true),
},
},
},
},
@ -377,6 +382,19 @@ func TestJobs_Canonicalize(t *testing.T) {
Perms: helper.StringToPtr("0644"),
LeftDelim: helper.StringToPtr("{{"),
RightDelim: helper.StringToPtr("}}"),
Envvars: helper.BoolToPtr(false),
},
{
SourcePath: helper.StringToPtr(""),
DestPath: helper.StringToPtr("local/file.env"),
EmbeddedTmpl: helper.StringToPtr("FOO=bar\n"),
ChangeMode: helper.StringToPtr("restart"),
ChangeSignal: helper.StringToPtr(""),
Splay: helper.TimeToPtr(5 * time.Second),
Perms: helper.StringToPtr("0644"),
LeftDelim: helper.StringToPtr("{{"),
RightDelim: helper.StringToPtr("}}"),
Envvars: helper.BoolToPtr(true),
},
},
},

View file

@ -336,6 +336,7 @@ type Template struct {
Perms *string `mapstructure:"perms"`
LeftDelim *string `mapstructure:"left_delimiter"`
RightDelim *string `mapstructure:"right_delimiter"`
Envvars *bool `mapstructure:"env"`
}
func (tmpl *Template) Canonicalize() {
@ -373,6 +374,9 @@ func (tmpl *Template) Canonicalize() {
if tmpl.RightDelim == nil {
tmpl.RightDelim = helper.StringToPtr("}}")
}
if tmpl.Envvars == nil {
tmpl.Envvars = helper.BoolToPtr(false)
}
}
type Vault struct {

View file

@ -135,13 +135,12 @@ func TestAllocRunner_RetryArtifact(t *testing.T) {
}
func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
task.Driver = "mock_driver"
task.Config["run_for"] = "10s"
go ar.Run()
testutil.WaitForResult(func() (bool, error) {
@ -234,13 +233,12 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) {
}
func TestAllocRunner_Destroy(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
task.Driver = "mock_driver"
task.Config["run_for"] = "10s"
go ar.Run()
start := time.Now()
@ -269,7 +267,7 @@ func TestAllocRunner_Destroy(t *testing.T) {
return nil
}); err != nil {
return false, fmt.Errorf("state not destroyed")
return false, fmt.Errorf("state not destroyed: %v", err)
}
// Check the alloc directory was cleaned
@ -290,13 +288,12 @@ func TestAllocRunner_Destroy(t *testing.T) {
}
func TestAllocRunner_Update(t *testing.T) {
ctestutil.ExecCompatible(t)
_, ar := testAllocRunner(false)
// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
task.Driver = "mock_driver"
task.Config["run_for"] = "10s"
go ar.Run()
defer ar.Destroy()
@ -612,6 +609,7 @@ func TestAllocRunner_RestoreOldState(t *testing.T) {
logger := testLogger()
conf := config.DefaultConfig()
conf.Node = mock.Node()
conf.StateDir = os.TempDir()
conf.AllocDir = os.TempDir()
tmp, err := ioutil.TempFile("", "state-db")

View file

@ -949,7 +949,7 @@ func (c *Client) setupDrivers() error {
var avail []string
var skipped []string
driverCtx := driver.NewDriverContext("", "", c.config, c.config.Node, c.logger, nil, nil)
driverCtx := driver.NewDriverContext("", "", c.config, c.config.Node, c.logger, nil)
for name := range driver.BuiltinDrivers {
// Skip fingerprinting drivers that are not in the whitelist if it is
// enabled.

View file

@ -1,7 +1,10 @@
package client
import (
"bufio"
"bytes"
"fmt"
"io"
"math/rand"
"os"
"path/filepath"
@ -76,7 +79,7 @@ type TaskTemplateManager struct {
func NewTaskTemplateManager(hook TaskHooks, tmpls []*structs.Template,
config *config.Config, vaultToken, taskDir string,
taskEnv *env.TaskEnvironment) (*TaskTemplateManager, error) {
envBuilder *env.Builder) (*TaskTemplateManager, error) {
// Check pre-conditions
if hook == nil {
@ -85,7 +88,7 @@ func NewTaskTemplateManager(hook TaskHooks, tmpls []*structs.Template,
return nil, fmt.Errorf("Invalid config given")
} else if taskDir == "" {
return nil, fmt.Errorf("Invalid task directory given")
} else if taskEnv == nil {
} else if envBuilder == nil {
return nil, fmt.Errorf("Invalid task environment given")
}
@ -114,14 +117,14 @@ func NewTaskTemplateManager(hook TaskHooks, tmpls []*structs.Template,
}
// Build the consul-template runner
runner, lookup, err := templateRunner(tmpls, config, vaultToken, taskDir, taskEnv)
runner, lookup, err := templateRunner(tmpls, config, vaultToken, taskDir, envBuilder.Build())
if err != nil {
return nil, err
}
tm.runner = runner
tm.lookup = lookup
go tm.run()
go tm.run(envBuilder, taskDir)
return tm, nil
}
@ -144,7 +147,7 @@ func (tm *TaskTemplateManager) Stop() {
}
// run is the long lived loop that handles errors and templates being rendered
func (tm *TaskTemplateManager) run() {
func (tm *TaskTemplateManager) run(envBuilder *env.Builder, taskDir string) {
// Runner is nil if there is no templates
if tm.runner == nil {
// Unblock the start if there is nothing to do
@ -192,6 +195,14 @@ WAIT:
}
}
// Read environment variables from env templates
envMap, err := loadTemplateEnv(tm.templates, taskDir)
if err != nil {
tm.hook.Kill("consul-template", err.Error(), true)
return
}
envBuilder.SetTemplateEnv(envMap)
allRenderedTime = time.Now()
tm.hook.UnblockStart("consul-template")
@ -242,6 +253,14 @@ WAIT:
return
}
// Read environment variables from templates
envMap, err := loadTemplateEnv(tmpls, taskDir)
if err != nil {
tm.hook.Kill("consul-template", err.Error(), true)
return
}
envBuilder.SetTemplateEnv(envMap)
for _, tmpl := range tmpls {
switch tmpl.ChangeMode {
case structs.TemplateChangeModeSignal:
@ -317,7 +336,7 @@ func (tm *TaskTemplateManager) allTemplatesNoop() bool {
// lookup by destination to the template. If no templates are given, a nil
// template runner and lookup is returned.
func templateRunner(tmpls []*structs.Template, config *config.Config,
vaultToken, taskDir string, taskEnv *env.TaskEnvironment) (
vaultToken, taskDir string, taskEnv *env.TaskEnv) (
*manager.Runner, map[string][]*structs.Template, error) {
if len(tmpls) == 0 {
@ -350,7 +369,7 @@ func templateRunner(tmpls []*structs.Template, config *config.Config,
}
// Set Nomad's environment variables
runner.Env = taskEnv.Build().EnvMapAll()
runner.Env = taskEnv.All()
// Build the lookup
idMap := runner.TemplateConfigMapping()
@ -368,9 +387,7 @@ func templateRunner(tmpls []*structs.Template, config *config.Config,
// parseTemplateConfigs converts the tasks templates into consul-templates
func parseTemplateConfigs(tmpls []*structs.Template, taskDir string,
taskEnv *env.TaskEnvironment, allowAbs bool) (map[ctconf.TemplateConfig]*structs.Template, error) {
// Build the task environment
taskEnv.Build()
taskEnv *env.TaskEnv, allowAbs bool) (map[ctconf.TemplateConfig]*structs.Template, error) {
ctmpls := make(map[ctconf.TemplateConfig]*structs.Template, len(tmpls))
for _, tmpl := range tmpls {
@ -492,3 +509,63 @@ func runnerConfig(config *config.Config, vaultToken string) (*ctconf.Config, err
conf.Finalize()
return conf, nil
}
// loadTemplateEnv loads task environment variables from all templates.
func loadTemplateEnv(tmpls []*structs.Template, taskDir string) (map[string]string, error) {
all := make(map[string]string, 50)
for _, t := range tmpls {
if !t.Envvars {
continue
}
f, err := os.Open(filepath.Join(taskDir, t.DestPath))
if err != nil {
return nil, fmt.Errorf("error opening env template: %v", err)
}
defer f.Close()
// Parse environment fil
vars, err := parseEnvFile(f)
if err != nil {
return nil, fmt.Errorf("error parsing env template %q: %v", t.DestPath, err)
}
for k, v := range vars {
all[k] = v
}
}
return all, nil
}
// parseEnvFile and return a map of the environment variables suitable for
// TaskEnvironment.AppendEnvvars or an error.
//
// See nomad/structs#Template.Envvars comment for format.
func parseEnvFile(r io.Reader) (map[string]string, error) {
vars := make(map[string]string, 50)
lines := 0
scanner := bufio.NewScanner(r)
for scanner.Scan() {
lines++
buf := scanner.Bytes()
if len(buf) == 0 {
// Skip empty lines
continue
}
if buf[0] == '#' {
// Skip lines starting with a #
continue
}
n := bytes.IndexByte(buf, '=')
if n == -1 {
return nil, fmt.Errorf("line %d: no '=' sign: %q", lines, string(buf))
}
if len(buf) > n {
vars[string(buf[0:n])] = string(buf[n+1 : len(buf)])
} else {
vars[string(buf[0:n])] = ""
}
}
if err := scanner.Err(); err != nil {
return nil, err
}
return vars, nil
}

View file

@ -91,7 +91,7 @@ type testHarness struct {
manager *TaskTemplateManager
mockHooks *MockTaskHooks
templates []*structs.Template
taskEnv *env.TaskEnvironment
envBuilder *env.Builder
node *structs.Node
config *config.Config
vaultToken string
@ -103,18 +103,22 @@ type testHarness struct {
// newTestHarness returns a harness starting a dev consul and vault server,
// building the appropriate config and creating a TaskTemplateManager
func newTestHarness(t *testing.T, templates []*structs.Template, consul, vault bool) *testHarness {
region := "global"
harness := &testHarness{
mockHooks: NewMockTaskHooks(),
templates: templates,
node: mock.Node(),
config: &config.Config{},
config: &config.Config{Region: region},
}
// Build the task environment
harness.taskEnv = env.NewTaskEnvironment(harness.node).SetTaskName(TestTaskName)
a := mock.Alloc()
task := a.Job.TaskGroups[0].Tasks[0]
task.Name = TestTaskName
harness.envBuilder = env.NewBuilder(harness.node, a, task, region)
// Make a tempdir
d, err := ioutil.TempDir("", "")
d, err := ioutil.TempDir("", "ct_test")
if err != nil {
t.Fatalf("Failed to make tmpdir: %v", err)
}
@ -141,7 +145,7 @@ func newTestHarness(t *testing.T, templates []*structs.Template, consul, vault b
func (h *testHarness) start(t *testing.T) {
manager, err := NewTaskTemplateManager(h.mockHooks, h.templates,
h.config, h.vaultToken, h.taskDir, h.taskEnv)
h.config, h.vaultToken, h.taskDir, h.envBuilder)
if err != nil {
t.Fatalf("failed to build task template manager: %v", err)
}
@ -151,7 +155,7 @@ func (h *testHarness) start(t *testing.T) {
func (h *testHarness) startWithErr() error {
manager, err := NewTaskTemplateManager(h.mockHooks, h.templates,
h.config, h.vaultToken, h.taskDir, h.taskEnv)
h.config, h.vaultToken, h.taskDir, h.envBuilder)
h.manager = manager
return err
}
@ -175,27 +179,29 @@ func (h *testHarness) stop() {
func TestTaskTemplateManager_Invalid(t *testing.T) {
hooks := NewMockTaskHooks()
var tmpls []*structs.Template
config := &config.Config{}
region := "global"
config := &config.Config{Region: region}
taskDir := "foo"
vaultToken := ""
taskEnv := env.NewTaskEnvironment(mock.Node())
a := mock.Alloc()
envBuilder := env.NewBuilder(mock.Node(), a, a.Job.TaskGroups[0].Tasks[0], config.Region)
_, err := NewTaskTemplateManager(nil, nil, nil, "", "", nil)
if err == nil {
t.Fatalf("Expected error")
}
_, err = NewTaskTemplateManager(nil, tmpls, config, vaultToken, taskDir, taskEnv)
_, err = NewTaskTemplateManager(nil, tmpls, config, vaultToken, taskDir, envBuilder)
if err == nil || !strings.Contains(err.Error(), "task hook") {
t.Fatalf("Expected invalid task hook error: %v", err)
}
_, err = NewTaskTemplateManager(hooks, tmpls, nil, vaultToken, taskDir, taskEnv)
_, err = NewTaskTemplateManager(hooks, tmpls, nil, vaultToken, taskDir, envBuilder)
if err == nil || !strings.Contains(err.Error(), "config") {
t.Fatalf("Expected invalid config error: %v", err)
}
_, err = NewTaskTemplateManager(hooks, tmpls, config, vaultToken, "", taskEnv)
_, err = NewTaskTemplateManager(hooks, tmpls, config, vaultToken, "", envBuilder)
if err == nil || !strings.Contains(err.Error(), "task directory") {
t.Fatalf("Expected invalid task dir error: %v", err)
}
@ -205,7 +211,7 @@ func TestTaskTemplateManager_Invalid(t *testing.T) {
t.Fatalf("Expected invalid task environment error: %v", err)
}
tm, err := NewTaskTemplateManager(hooks, tmpls, config, vaultToken, taskDir, taskEnv)
tm, err := NewTaskTemplateManager(hooks, tmpls, config, vaultToken, taskDir, envBuilder)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
} else if tm == nil {
@ -221,7 +227,7 @@ func TestTaskTemplateManager_Invalid(t *testing.T) {
}
tmpls = append(tmpls, tmpl)
tm, err = NewTaskTemplateManager(hooks, tmpls, config, vaultToken, taskDir, taskEnv)
tm, err = NewTaskTemplateManager(hooks, tmpls, config, vaultToken, taskDir, envBuilder)
if err == nil || !strings.Contains(err.Error(), "Failed to parse signal") {
t.Fatalf("Expected signal parsing error: %v", err)
}
@ -905,3 +911,125 @@ func TestTaskTemplateManager_Signal_Error(t *testing.T) {
t.Fatalf("Unexpected error: %v", harness.mockHooks.KillReason)
}
}
// TestTaskTemplateManager_Env asserts templates with the env flag set are read
// into the task's environment.
func TestTaskTemplateManager_Env(t *testing.T) {
template := &structs.Template{
EmbeddedTmpl: `
# Comment lines are ok
FOO=bar
foo=123
ANYTHING-goes=Spaces are=ok!
`,
DestPath: "test.env",
ChangeMode: structs.TemplateChangeModeNoop,
Envvars: true,
}
harness := newTestHarness(t, []*structs.Template{template}, true, false)
harness.start(t)
defer harness.stop()
// Wait a little
select {
case <-harness.mockHooks.UnblockCh:
case <-time.After(time.Duration(2*testutil.TestMultiplier()) * time.Second):
t.Fatalf("Should have received unblock: %+v", harness.mockHooks)
}
// Validate environment
env := harness.envBuilder.Build().Map()
if len(env) < 3 {
t.Fatalf("expected at least 3 env vars but found %d:\n%#v\n", len(env), env)
}
if env["FOO"] != "bar" {
t.Errorf("expected FOO=bar but found %q", env["FOO"])
}
if env["foo"] != "123" {
t.Errorf("expected foo=123 but found %q", env["foo"])
}
if env["ANYTHING_goes"] != "Spaces are=ok!" {
t.Errorf("expected ANYTHING_GOES='Spaces are ok!' but found %q", env["ANYTHING_goes"])
}
}
// TestTaskTemplateManager_Env_Missing asserts the core env
// template processing function returns errors when files don't exist
func TestTaskTemplateManager_Env_Missing(t *testing.T) {
d, err := ioutil.TempDir("", "ct_env_missing")
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(d)
// Fake writing the file so we don't have to run the whole template manager
err = ioutil.WriteFile(filepath.Join(d, "exists.env"), []byte("FOO=bar\n"), 0644)
if err != nil {
t.Fatalf("error writing template file: %v", err)
}
templates := []*structs.Template{
{
EmbeddedTmpl: "FOO=bar\n",
DestPath: "exists.env",
Envvars: true,
},
{
EmbeddedTmpl: "WHAT=ever\n",
DestPath: "missing.env",
Envvars: true,
},
}
if vars, err := loadTemplateEnv(templates, d); err == nil {
t.Fatalf("expected an error but instead got env vars: %#v", vars)
}
}
// TestTaskTemplateManager_Env_Multi asserts the core env
// template processing function returns combined env vars from multiple
// templates correctly.
func TestTaskTemplateManager_Env_Multi(t *testing.T) {
d, err := ioutil.TempDir("", "ct_env_missing")
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(d)
// Fake writing the files so we don't have to run the whole template manager
err = ioutil.WriteFile(filepath.Join(d, "zzz.env"), []byte("FOO=bar\nSHARED=nope\n"), 0644)
if err != nil {
t.Fatalf("error writing template file 1: %v", err)
}
err = ioutil.WriteFile(filepath.Join(d, "aaa.env"), []byte("BAR=foo\nSHARED=yup\n"), 0644)
if err != nil {
t.Fatalf("error writing template file 2: %v", err)
}
// Templates will get loaded in order (not alpha sorted)
templates := []*structs.Template{
{
DestPath: "zzz.env",
Envvars: true,
},
{
DestPath: "aaa.env",
Envvars: true,
},
}
vars, err := loadTemplateEnv(templates, d)
if err != nil {
t.Fatalf("expected an error but instead got env vars: %#v", vars)
}
if vars["FOO"] != "bar" {
t.Errorf("expected FOO=bar but found %q", vars["FOO"])
}
if vars["BAR"] != "foo" {
t.Errorf("expected BAR=foo but found %q", vars["BAR"])
}
if vars["SHARED"] != "yup" {
t.Errorf("expected FOO=bar but found %q", vars["yup"])
}
}

View file

@ -180,7 +180,7 @@ func (c *DockerDriverConfig) Validate() error {
// NewDockerDriverConfig returns a docker driver config by parsing the HCL
// config
func NewDockerDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*DockerDriverConfig, error) {
func NewDockerDriverConfig(task *structs.Task, env *env.TaskEnv) (*DockerDriverConfig, error) {
var dconf DockerDriverConfig
if err := mapstructure.WeakDecode(task.Config, &dconf); err != nil {
@ -455,8 +455,8 @@ func (d *DockerDriver) getDockerCoordinator(client *docker.Client) (*dockerCoord
return GetDockerCoordinator(config), fmt.Sprintf("%s-%s", d.DriverContext.allocID, d.DriverContext.taskName)
}
func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) {
driverConfig, err := NewDockerDriverConfig(task, d.taskEnv)
func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*PrestartResponse, error) {
driverConfig, err := NewDockerDriverConfig(task, ctx.TaskEnv)
if err != nil {
return nil, err
}
@ -476,10 +476,11 @@ func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedR
return nil, err
}
res := NewCreatedResources()
res.Add(dockerImageResKey, id)
resp := NewPrestartResponse()
resp.CreatedResources.Add(dockerImageResKey, id)
resp.PortMap = d.driverConfig.PortMap
d.imageID = id
return res, nil
return resp, nil
}
func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
@ -495,7 +496,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
return nil, err
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
TaskEnv: ctx.TaskEnv,
Task: task,
Driver: "docker",
AllocID: d.DriverContext.allocID,
@ -899,14 +900,11 @@ func (d *DockerDriver) createContainerConfig(ctx *ExecContext, task *structs.Tas
d.logger.Printf("[DEBUG] driver.docker: exposed port %s", containerPort)
}
d.taskEnv.SetPortMap(driverConfig.PortMap)
hostConfig.PortBindings = publishedPorts
config.ExposedPorts = exposedPorts
}
d.taskEnv.Build()
parsedArgs := d.taskEnv.ParseAndReplace(driverConfig.Args)
parsedArgs := ctx.TaskEnv.ParseAndReplace(driverConfig.Args)
// If the user specified a custom command to run, we'll inject it here.
if driverConfig.Command != "" {
@ -930,7 +928,7 @@ func (d *DockerDriver) createContainerConfig(ctx *ExecContext, task *structs.Tas
d.logger.Printf("[DEBUG] driver.docker: applied labels on the container: %+v", config.Labels)
}
config.Env = d.taskEnv.EnvList()
config.Env = ctx.TaskEnv.List()
containerName := fmt.Sprintf("%s-%s", task.Name, d.DriverContext.allocID)
d.logger.Printf("[DEBUG] driver.docker: setting container name to: %s", containerName)

View file

@ -108,12 +108,16 @@ func dockerSetupWithClient(t *testing.T, task *structs.Task, client *docker.Clie
driver := NewDockerDriver(tctx.DriverCtx)
copyImage(t, tctx.ExecCtx.TaskDir, "busybox.tar")
res, err := driver.Prestart(tctx.ExecCtx, task)
resp, err := driver.Prestart(tctx.ExecCtx, task)
if err != nil {
tctx.AllocDir.Destroy()
t.Fatalf("error in prestart: %v", err)
}
// At runtime this is handled by TaskRunner
tctx.EnvBuilder.SetPortMap(resp.PortMap)
tctx.ExecCtx.TaskEnv = tctx.EnvBuilder.Build()
handle, err := driver.Start(tctx.ExecCtx, task)
if err != nil {
tctx.AllocDir.Destroy()
@ -126,7 +130,7 @@ func dockerSetupWithClient(t *testing.T, task *structs.Task, client *docker.Clie
}
cleanup := func() {
driver.Cleanup(tctx.ExecCtx, res)
driver.Cleanup(tctx.ExecCtx, resp.CreatedResources)
handle.Kill()
tctx.AllocDir.Destroy()
}
@ -916,7 +920,7 @@ func TestDockerDriver_PortsMapping(t *testing.T) {
for key, val := range expectedEnvironment {
search := fmt.Sprintf("%s=%s", key, val)
if !inSlice(search, container.Config.Env) {
t.Errorf("Expected to find %s in container environment: %+v", search, container.Config.Env)
t.Errorf("Expected to find %s in container environment:\n%s\n\n", search, strings.Join(container.Config.Env, "\n"))
}
}
}
@ -1106,7 +1110,8 @@ func setupDockerVolumes(t *testing.T, cfg *config.Config, hostpath string) (*str
}
alloc := mock.Alloc()
execCtx := NewExecContext(taskDir)
envBuilder := env.NewBuilder(cfg.Node, alloc, task, cfg.Region)
execCtx := NewExecContext(taskDir, envBuilder.Build())
cleanup := func() {
allocDir.Destroy()
if filepath.IsAbs(hostpath) {
@ -1114,17 +1119,11 @@ func setupDockerVolumes(t *testing.T, cfg *config.Config, hostpath string) (*str
}
}
taskEnv, err := GetTaskEnv(taskDir, cfg.Node, task, alloc, cfg, "")
if err != nil {
cleanup()
t.Fatalf("Failed to get task env: %v", err)
}
logger := testLogger()
emitter := func(m string, args ...interface{}) {
logger.Printf("[EVENT] "+m, args...)
}
driverCtx := NewDriverContext(task.Name, alloc.ID, cfg, cfg.Node, testLogger(), taskEnv, emitter)
driverCtx := NewDriverContext(task.Name, alloc.ID, cfg, cfg.Node, testLogger(), emitter)
driver := NewDockerDriver(driverCtx)
copyImage(t, taskDir, "busybox.tar")
@ -1255,10 +1254,11 @@ func TestDockerDriver_Cleanup(t *testing.T) {
// Run Prestart
driver := NewDockerDriver(tctx.DriverCtx).(*DockerDriver)
res, err := driver.Prestart(tctx.ExecCtx, task)
resp, err := driver.Prestart(tctx.ExecCtx, task)
if err != nil {
t.Fatalf("error in prestart: %v", err)
}
res := resp.CreatedResources
if len(res.Resources) == 0 || len(res.Resources[dockerImageResKey]) == 0 {
t.Fatalf("no created resources: %#v", res)
}

View file

@ -8,7 +8,6 @@ import (
"io"
"log"
"os"
"strings"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
@ -47,13 +46,31 @@ func NewDriver(name string, ctx *DriverContext) (Driver, error) {
}
// Instantiate the driver
f := factory(ctx)
return f, nil
d := factory(ctx)
return d, nil
}
// Factory is used to instantiate a new Driver
type Factory func(*DriverContext) Driver
// PrestartResponse is driver state returned by Driver.Prestart.
type PrestartResponse struct {
// CreatedResources by the driver.
CreatedResources *CreatedResources
// PortMap can be set by drivers to replace ports in environment
// variables with driver-specific mappings.
PortMap map[string]int
}
// NewPrestartResponse creates a new PrestartResponse with CreatedResources
// initialized.
func NewPrestartResponse() *PrestartResponse {
return &PrestartResponse{
CreatedResources: NewCreatedResources(),
}
}
// CreatedResources is a map of resources (eg downloaded images) created by a driver
// that must be cleaned up.
type CreatedResources struct {
@ -177,7 +194,7 @@ type Driver interface {
// intialization steps like downloading images.
//
// CreatedResources may be non-nil even when an error occurs.
Prestart(*ExecContext, *structs.Task) (*CreatedResources, error)
Prestart(*ExecContext, *structs.Task) (*PrestartResponse, error)
// Start is used to being task execution
Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error)
@ -226,7 +243,6 @@ type DriverContext struct {
config *config.Config
logger *log.Logger
node *structs.Node
taskEnv *env.TaskEnvironment
emitEvent LogEventFn
}
@ -242,14 +258,13 @@ func NewEmptyDriverContext() *DriverContext {
// private to the driver. If we want to change this later we can gorename all of
// the fields in DriverContext.
func NewDriverContext(taskName, allocID string, config *config.Config, node *structs.Node,
logger *log.Logger, taskEnv *env.TaskEnvironment, eventEmitter LogEventFn) *DriverContext {
logger *log.Logger, eventEmitter LogEventFn) *DriverContext {
return &DriverContext{
taskName: taskName,
allocID: allocID,
config: config,
node: node,
logger: logger,
taskEnv: taskEnv,
emitEvent: eventEmitter,
}
}
@ -291,70 +306,19 @@ type ScriptExecutor interface {
type ExecContext struct {
// TaskDir contains information about the task directory structure.
TaskDir *allocdir.TaskDir
// TaskEnv contains the task's environment variables.
TaskEnv *env.TaskEnv
}
// NewExecContext is used to create a new execution context
func NewExecContext(td *allocdir.TaskDir) *ExecContext {
func NewExecContext(td *allocdir.TaskDir, te *env.TaskEnv) *ExecContext {
return &ExecContext{
TaskDir: td,
TaskEnv: te,
}
}
// GetTaskEnv converts the alloc dir, the node, task and alloc into a
// TaskEnvironment.
func GetTaskEnv(taskDir *allocdir.TaskDir, node *structs.Node,
task *structs.Task, alloc *structs.Allocation, conf *config.Config,
vaultToken string) (*env.TaskEnvironment, error) {
env := env.NewTaskEnvironment(node).
SetTaskMeta(alloc.Job.CombinedTaskMeta(alloc.TaskGroup, task.Name)).
SetJobName(alloc.Job.Name).
SetDatacenterName(node.Datacenter).
SetRegionName(conf.Region).
SetEnvvars(task.Env).
SetTaskName(task.Name)
// Vary paths by filesystem isolation used
drv, err := NewDriver(task.Driver, NewEmptyDriverContext())
if err != nil {
return nil, err
}
switch drv.FSIsolation() {
case cstructs.FSIsolationNone:
// Use host paths
env.SetAllocDir(taskDir.SharedAllocDir)
env.SetTaskLocalDir(taskDir.LocalDir)
env.SetSecretsDir(taskDir.SecretsDir)
default:
// filesystem isolation; use container paths
env.SetAllocDir(allocdir.SharedAllocContainerPath)
env.SetTaskLocalDir(allocdir.TaskLocalContainerPath)
env.SetSecretsDir(allocdir.TaskSecretsContainerPath)
}
if task.Resources != nil {
env.SetMemLimit(task.Resources.MemoryMB).
SetCpuLimit(task.Resources.CPU).
SetNetworks(task.Resources.Networks)
}
if alloc != nil {
env.SetAlloc(alloc)
}
if task.Vault != nil {
env.SetVaultToken(vaultToken, task.Vault.Env)
}
// Set the host environment variables for non-image based drivers
if drv.FSIsolation() != cstructs.FSIsolationImage {
filter := strings.Split(conf.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",")
env.AppendHostEnvvars(filter)
}
return env.Build(), nil
}
func mapMergeStrInt(maps ...map[string]int) map[string]int {
out := map[string]int{}
for _, in := range maps {

View file

@ -80,9 +80,10 @@ func testConfig() *config.Config {
}
type testContext struct {
AllocDir *allocdir.AllocDir
DriverCtx *DriverContext
ExecCtx *ExecContext
AllocDir *allocdir.AllocDir
DriverCtx *DriverContext
ExecCtx *ExecContext
EnvBuilder *env.Builder
}
// testDriverContext sets up an alloc dir, task dir, DriverContext, and ExecContext.
@ -112,23 +113,17 @@ func testDriverContexts(t *testing.T, task *structs.Task) *testContext {
t.Fatalf("TaskDir.Build(%#v, %q) failed: %v", config.DefaultChrootEnv, tmpdrv.FSIsolation(), err)
return nil
}
execCtx := NewExecContext(td)
taskEnv, err := GetTaskEnv(td, cfg.Node, task, alloc, cfg, "")
if err != nil {
allocDir.Destroy()
t.Fatalf("GetTaskEnv() failed: %v", err)
return nil
}
eb := env.NewBuilder(cfg.Node, alloc, task, cfg.Region)
SetEnvvars(eb, tmpdrv.FSIsolation(), td, cfg)
execCtx := NewExecContext(td, eb.Build())
logger := testLogger()
emitter := func(m string, args ...interface{}) {
logger.Printf("[EVENT] "+m, args...)
}
driverCtx := NewDriverContext(task.Name, alloc.ID, cfg, cfg.Node, logger, taskEnv, emitter)
driverCtx := NewDriverContext(task.Name, alloc.ID, cfg, cfg.Node, logger, emitter)
return &testContext{allocDir, driverCtx, execCtx}
return &testContext{allocDir, driverCtx, execCtx, eb}
}
// setupTaskEnv creates a test env for GetTaskEnv testing. Returns task dir,
@ -165,10 +160,12 @@ func setupTaskEnv(t *testing.T, driver string) (*allocdir.TaskDir, map[string]st
conf := testConfig()
allocDir := allocdir.NewAllocDir(testLogger(), filepath.Join(conf.AllocDir, alloc.ID))
taskDir := allocDir.NewTaskDir(task.Name)
env, err := GetTaskEnv(taskDir, conf.Node, task, alloc, conf, "")
eb := env.NewBuilder(conf.Node, alloc, task, conf.Region)
tmpDriver, err := NewDriver(driver, NewEmptyDriverContext())
if err != nil {
t.Fatalf("GetTaskEnv() failed: %v", err)
t.Fatalf("unable to create driver %q: %v", driver, err)
}
SetEnvvars(eb, tmpDriver.FSIsolation(), taskDir, conf)
exp := map[string]string{
"NOMAD_CPU_LIMIT": "1000",
"NOMAD_MEMORY_LIMIT": "500",
@ -216,7 +213,7 @@ func setupTaskEnv(t *testing.T, driver string) (*allocdir.TaskDir, map[string]st
"NOMAD_REGION": "global",
}
act := env.EnvMap()
act := eb.Build().Map()
return taskDir, exp, act
}
@ -275,9 +272,9 @@ func TestDriver_GetTaskEnv_Chroot(t *testing.T) {
}
}
// TestDriver_GetTaskEnv_Image ensures host environment variables are not set
// TestDriver_TaskEnv_Image ensures host environment variables are not set
// for image based drivers. See #2211
func TestDriver_GetTaskEnv_Image(t *testing.T) {
func TestDriver_TaskEnv_Image(t *testing.T) {
_, exp, act := setupTaskEnv(t, "docker")
exp[env.AllocDir] = allocdir.SharedAllocContainerPath

View file

@ -6,6 +6,7 @@ import (
"os"
"strconv"
"strings"
"sync"
"github.com/hashicorp/nomad/helper"
hargs "github.com/hashicorp/nomad/helper/args"
@ -89,464 +90,431 @@ const (
nodeMetaPrefix = "meta."
)
// TaskEnvironment is used to expose information to a task via environment
// variables and provide interpolation of Nomad variables.
type TaskEnvironment struct {
Env map[string]string
TaskMeta map[string]string
AllocDir string
TaskDir string
SecretsDir string
CpuLimit int
MemLimit int
TaskName string
AllocIndex int
Datacenter string
Region string
AllocId string
AllocName string
Node *structs.Node
Networks []*structs.NetworkResource
PortMap map[string]int
VaultToken string
InjectVaultToken bool
JobName string
Alloc *structs.Allocation
// TaskEnv is a task's environment as well as node attribute's for
// interpolation.
type TaskEnv struct {
// NodeAttrs is the map of node attributes for interpolation
NodeAttrs map[string]string
// taskEnv is the variables that will be set in the tasks environment
TaskEnv map[string]string
// EnvMap is the map of environment variables
EnvMap map[string]string
// nodeValues is the values that are allowed for interprolation from the
// node.
NodeValues map[string]string
// envList is a memoized list created by List()
envList []string
}
func NewTaskEnvironment(node *structs.Node) *TaskEnvironment {
return &TaskEnvironment{Node: node, AllocIndex: -1}
// NewTaskEnv creates a new task environment with the given environment and
// node attribute maps.
func NewTaskEnv(env, node map[string]string) *TaskEnv {
return &TaskEnv{
NodeAttrs: node,
EnvMap: env,
}
}
// ParseAndReplace takes the user supplied args replaces any instance of an
// environment variable or nomad variable in the args with the actual value.
func (t *TaskEnvironment) ParseAndReplace(args []string) []string {
replaced := make([]string, len(args))
for i, arg := range args {
replaced[i] = hargs.ReplaceEnv(arg, t.TaskEnv, t.NodeValues)
// List returns the task's environment as a slice of NAME=value pair strings.
func (t *TaskEnv) List() []string {
if t.envList != nil {
return t.envList
}
return replaced
}
// ReplaceEnv takes an arg and replaces all occurrences of environment variables
// and nomad variables. If the variable is found in the passed map it is
// replaced, otherwise the original string is returned.
func (t *TaskEnvironment) ReplaceEnv(arg string) string {
return hargs.ReplaceEnv(arg, t.TaskEnv, t.NodeValues)
}
// Build must be called after all the tasks environment values have been set.
func (t *TaskEnvironment) Build() *TaskEnvironment {
t.NodeValues = make(map[string]string)
t.TaskEnv = make(map[string]string)
// Build the meta
for k, v := range t.TaskMeta {
t.TaskEnv[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v
t.TaskEnv[fmt.Sprintf("%s%s", MetaPrefix, k)] = v
}
// Build the ports
for _, network := range t.Networks {
for label, value := range network.MapLabelToValues(nil) {
t.TaskEnv[fmt.Sprintf("%s%s", IpPrefix, label)] = network.IP
t.TaskEnv[fmt.Sprintf("%s%s", HostPortPrefix, label)] = strconv.Itoa(value)
if forwardedPort, ok := t.PortMap[label]; ok {
value = forwardedPort
}
t.TaskEnv[fmt.Sprintf("%s%s", PortPrefix, label)] = strconv.Itoa(value)
IPPort := net.JoinHostPort(network.IP, strconv.Itoa(value))
t.TaskEnv[fmt.Sprintf("%s%s", AddrPrefix, label)] = IPPort
}
}
// Build the directories
if t.AllocDir != "" {
t.TaskEnv[AllocDir] = t.AllocDir
}
if t.TaskDir != "" {
t.TaskEnv[TaskLocalDir] = t.TaskDir
}
if t.SecretsDir != "" {
t.TaskEnv[SecretsDir] = t.SecretsDir
}
// Build the resource limits
if t.MemLimit != 0 {
t.TaskEnv[MemLimit] = strconv.Itoa(t.MemLimit)
}
if t.CpuLimit != 0 {
t.TaskEnv[CpuLimit] = strconv.Itoa(t.CpuLimit)
}
// Build the tasks ids
if t.AllocId != "" {
t.TaskEnv[AllocID] = t.AllocId
}
if t.AllocName != "" {
t.TaskEnv[AllocName] = t.AllocName
}
if t.AllocIndex != -1 {
t.TaskEnv[AllocIndex] = strconv.Itoa(t.AllocIndex)
}
if t.TaskName != "" {
t.TaskEnv[TaskName] = t.TaskName
}
if t.JobName != "" {
t.TaskEnv[JobName] = t.JobName
}
if t.Datacenter != "" {
t.TaskEnv[Datacenter] = t.Datacenter
}
if t.Region != "" {
t.TaskEnv[Region] = t.Region
}
// Build the addr of the other tasks
if t.Alloc != nil {
for taskName, resources := range t.Alloc.TaskResources {
if taskName == t.TaskName {
continue
}
for _, nw := range resources.Networks {
ports := make([]structs.Port, 0, len(nw.ReservedPorts)+len(nw.DynamicPorts))
for _, port := range nw.ReservedPorts {
ports = append(ports, port)
}
for _, port := range nw.DynamicPorts {
ports = append(ports, port)
}
for _, p := range ports {
key := fmt.Sprintf("%s%s_%s", AddrPrefix, taskName, p.Label)
t.TaskEnv[key] = fmt.Sprintf("%s:%d", nw.IP, p.Value)
key = fmt.Sprintf("%s%s_%s", IpPrefix, taskName, p.Label)
t.TaskEnv[key] = nw.IP
key = fmt.Sprintf("%s%s_%s", PortPrefix, taskName, p.Label)
t.TaskEnv[key] = strconv.Itoa(p.Value)
}
}
}
}
// Build the node
if t.Node != nil {
// Set up the node values.
t.NodeValues[nodeIdKey] = t.Node.ID
t.NodeValues[nodeDcKey] = t.Node.Datacenter
t.NodeValues[nodeRegionKey] = t.Region
t.NodeValues[nodeNameKey] = t.Node.Name
t.NodeValues[nodeClassKey] = t.Node.NodeClass
// Set up the attributes.
for k, v := range t.Node.Attributes {
t.NodeValues[fmt.Sprintf("%s%s", nodeAttributePrefix, k)] = v
}
// Set up the meta.
for k, v := range t.Node.Meta {
t.NodeValues[fmt.Sprintf("%s%s", nodeMetaPrefix, k)] = v
}
}
// Build the Vault Token
if t.InjectVaultToken && t.VaultToken != "" {
t.TaskEnv[VaultToken] = t.VaultToken
}
// Interpret the environment variables
interpreted := make(map[string]string, len(t.Env))
for k, v := range t.Env {
interpreted[k] = hargs.ReplaceEnv(v, t.NodeValues, t.TaskEnv)
}
for k, v := range interpreted {
t.TaskEnv[k] = v
}
// Clean keys (see #2405)
cleanedEnv := make(map[string]string, len(t.TaskEnv))
for k, v := range t.TaskEnv {
cleanedK := helper.CleanEnvVar(k, '_')
cleanedEnv[cleanedK] = v
}
t.TaskEnv = cleanedEnv
return t
}
// EnvList returns a list of strings with NAME=value pairs.
func (t *TaskEnvironment) EnvList() []string {
env := []string{}
for k, v := range t.TaskEnv {
for k, v := range t.EnvMap {
env = append(env, fmt.Sprintf("%s=%s", k, v))
}
return env
}
// EnvMap returns a copy of the tasks environment variables.
func (t *TaskEnvironment) EnvMap() map[string]string {
m := make(map[string]string, len(t.TaskEnv))
for k, v := range t.TaskEnv {
// Map of the task's environment variables.
func (t *TaskEnv) Map() map[string]string {
m := make(map[string]string, len(t.EnvMap))
for k, v := range t.EnvMap {
m[k] = v
}
return m
}
// EnvMapAll returns the environment variables that will be set as well as node
// meta/attrs in the map. This is appropriate for interpolation.
func (t *TaskEnvironment) EnvMapAll() map[string]string {
m := make(map[string]string, len(t.TaskEnv))
for k, v := range t.TaskEnv {
// All of the task's environment variables and the node's attributes in a
// single map.
func (t *TaskEnv) All() map[string]string {
m := make(map[string]string, len(t.EnvMap)+len(t.NodeAttrs))
for k, v := range t.EnvMap {
m[k] = v
}
for k, v := range t.NodeValues {
for k, v := range t.NodeAttrs {
m[k] = v
}
return m
}
// Builder methods to build the TaskEnvironment
func (t *TaskEnvironment) SetAllocDir(dir string) *TaskEnvironment {
t.AllocDir = dir
return t
}
func (t *TaskEnvironment) ClearAllocDir() *TaskEnvironment {
t.AllocDir = ""
return t
}
func (t *TaskEnvironment) SetTaskLocalDir(dir string) *TaskEnvironment {
t.TaskDir = dir
return t
}
func (t *TaskEnvironment) ClearTaskLocalDir() *TaskEnvironment {
t.TaskDir = ""
return t
}
func (t *TaskEnvironment) SetSecretsDir(dir string) *TaskEnvironment {
t.SecretsDir = dir
return t
}
func (t *TaskEnvironment) ClearSecretsDir() *TaskEnvironment {
t.SecretsDir = ""
return t
}
func (t *TaskEnvironment) SetMemLimit(limit int) *TaskEnvironment {
t.MemLimit = limit
return t
}
func (t *TaskEnvironment) ClearMemLimit() *TaskEnvironment {
t.MemLimit = 0
return t
}
func (t *TaskEnvironment) SetCpuLimit(limit int) *TaskEnvironment {
t.CpuLimit = limit
return t
}
func (t *TaskEnvironment) ClearCpuLimit() *TaskEnvironment {
t.CpuLimit = 0
return t
}
func (t *TaskEnvironment) SetNetworks(networks []*structs.NetworkResource) *TaskEnvironment {
t.Networks = networks
return t
}
func (t *TaskEnvironment) clearNetworks() *TaskEnvironment {
t.Networks = nil
return t
}
func (t *TaskEnvironment) SetPortMap(portMap map[string]int) *TaskEnvironment {
t.PortMap = portMap
return t
}
func (t *TaskEnvironment) clearPortMap() *TaskEnvironment {
t.PortMap = nil
return t
}
// Takes a map of meta values to be passed to the task. The keys are capatilized
// when the environent variable is set.
func (t *TaskEnvironment) SetTaskMeta(m map[string]string) *TaskEnvironment {
t.TaskMeta = m
return t
}
func (t *TaskEnvironment) ClearTaskMeta() *TaskEnvironment {
t.TaskMeta = nil
return t
}
func (t *TaskEnvironment) SetEnvvars(m map[string]string) *TaskEnvironment {
t.Env = m
return t
}
// Appends the given environment variables.
func (t *TaskEnvironment) AppendEnvvars(m map[string]string) *TaskEnvironment {
if t.Env == nil {
t.Env = make(map[string]string, len(m))
// ParseAndReplace takes the user supplied args replaces any instance of an
// environment variable or Nomad variable in the args with the actual value.
func (t *TaskEnv) ParseAndReplace(args []string) []string {
replaced := make([]string, len(args))
for i, arg := range args {
replaced[i] = hargs.ReplaceEnv(arg, t.EnvMap, t.NodeAttrs)
}
for k, v := range m {
t.Env[k] = v
}
return t
return replaced
}
// AppendHostEnvvars adds the host environment variables to the tasks. The
// filter parameter can be use to filter host environment from entering the
// tasks.
func (t *TaskEnvironment) AppendHostEnvvars(filter []string) *TaskEnvironment {
hostEnv := os.Environ()
if t.Env == nil {
t.Env = make(map[string]string, len(hostEnv))
// ReplaceEnv takes an arg and replaces all occurrences of environment variables
// and Nomad variables. If the variable is found in the passed map it is
// replaced, otherwise the original string is returned.
func (t *TaskEnv) ReplaceEnv(arg string) string {
return hargs.ReplaceEnv(arg, t.EnvMap, t.NodeAttrs)
}
// Builder is used to build task environment's and is safe for concurrent use.
type Builder struct {
// envvars are custom set environment variables
envvars map[string]string
// templateEnv are env vars set from templates
templateEnv map[string]string
// hostEnv are environment variables filtered from the host
hostEnv map[string]string
// nodeAttrs are Node attributes and metadata
nodeAttrs map[string]string
// taskMeta are the meta attributes on the task
taskMeta map[string]string
// allocDir from task's perspective; eg /alloc
allocDir string
// localDir from task's perspective; eg /local
localDir string
// secrestsDir from task's perspective; eg /secrets
secretsDir string
cpuLimit int
memLimit int
taskName string
allocIndex int
datacenter string
region string
allocId string
allocName string
portMap map[string]string
vaultToken string
injectVaultToken bool
jobName string
// otherPorts for tasks in the same alloc
otherPorts map[string]string
// network resources from the task; must be lazily turned into env vars
// because portMaps can change after builder creation and affect
// network env vars.
networks []*structs.NetworkResource
mu *sync.RWMutex
}
// NewBuilder creates a new task environment builder.
func NewBuilder(node *structs.Node, alloc *structs.Allocation, task *structs.Task, region string) *Builder {
b := &Builder{
region: region,
mu: &sync.RWMutex{},
}
return b.setTask(task).setAlloc(alloc).setNode(node)
}
// NewEmptyBuilder creates a new environment builder.
func NewEmptyBuilder() *Builder {
return &Builder{
mu: &sync.RWMutex{},
}
}
// Build must be called after all the tasks environment values have been set.
func (b *Builder) Build() *TaskEnv {
nodeAttrs := make(map[string]string)
envMap := make(map[string]string)
b.mu.RLock()
defer b.mu.RUnlock()
// Add the directories
if b.allocDir != "" {
envMap[AllocDir] = b.allocDir
}
if b.localDir != "" {
envMap[TaskLocalDir] = b.localDir
}
if b.secretsDir != "" {
envMap[SecretsDir] = b.secretsDir
}
// Index the filtered environment variables.
index := make(map[string]struct{}, len(filter))
// Add the resource limits
if b.memLimit != 0 {
envMap[MemLimit] = strconv.Itoa(b.memLimit)
}
if b.cpuLimit != 0 {
envMap[CpuLimit] = strconv.Itoa(b.cpuLimit)
}
// Add the task metadata
if b.allocId != "" {
envMap[AllocID] = b.allocId
}
if b.allocName != "" {
envMap[AllocName] = b.allocName
}
if b.allocIndex != -1 {
envMap[AllocIndex] = strconv.Itoa(b.allocIndex)
}
if b.taskName != "" {
envMap[TaskName] = b.taskName
}
if b.jobName != "" {
envMap[JobName] = b.jobName
}
if b.datacenter != "" {
envMap[Datacenter] = b.datacenter
}
if b.region != "" {
envMap[Region] = b.region
// Copy region over to node attrs
nodeAttrs[nodeRegionKey] = b.region
}
// Build the addrs for this task
for _, network := range b.networks {
for label, intVal := range network.MapLabelToValues(nil) {
value := strconv.Itoa(intVal)
envMap[fmt.Sprintf("%s%s", IpPrefix, label)] = network.IP
envMap[fmt.Sprintf("%s%s", HostPortPrefix, label)] = value
if forwardedPort, ok := b.portMap[label]; ok {
value = forwardedPort
}
envMap[fmt.Sprintf("%s%s", PortPrefix, label)] = value
ipPort := net.JoinHostPort(network.IP, value)
envMap[fmt.Sprintf("%s%s", AddrPrefix, label)] = ipPort
}
}
// Build the addr of the other tasks
for k, v := range b.otherPorts {
envMap[k] = v
}
// Build the Vault Token
if b.injectVaultToken && b.vaultToken != "" {
envMap[VaultToken] = b.vaultToken
}
// Copy task meta
for k, v := range b.taskMeta {
envMap[k] = v
}
// Copy node attributes
for k, v := range b.nodeAttrs {
nodeAttrs[k] = v
}
// Interpolate and add environment variables
for k, v := range b.hostEnv {
envMap[k] = hargs.ReplaceEnv(v, nodeAttrs, envMap)
}
// Copy interpolated task env vars second as they override host env vars
for k, v := range b.envvars {
envMap[k] = hargs.ReplaceEnv(v, nodeAttrs, envMap)
}
// Copy template env vars third as they override task env vars
for k, v := range b.templateEnv {
envMap[k] = v
}
// Clean keys (see #2405)
cleanedEnv := make(map[string]string, len(envMap))
for k, v := range envMap {
cleanedK := helper.CleanEnvVar(k, '_')
cleanedEnv[cleanedK] = v
}
return NewTaskEnv(cleanedEnv, nodeAttrs)
}
// Update task updates the environment based on a new alloc and task.
func (b *Builder) UpdateTask(alloc *structs.Allocation, task *structs.Task) *Builder {
b.mu.Lock()
defer b.mu.Unlock()
return b.setTask(task).setAlloc(alloc)
}
// setTask is called from NewBuilder to populate task related environment
// variables.
func (b *Builder) setTask(task *structs.Task) *Builder {
b.taskName = task.Name
b.envvars = make(map[string]string, len(task.Env))
for k, v := range task.Env {
b.envvars[k] = v
}
if task.Resources == nil {
b.memLimit = 0
b.cpuLimit = 0
b.networks = []*structs.NetworkResource{}
} else {
b.memLimit = task.Resources.MemoryMB
b.cpuLimit = task.Resources.CPU
// Copy networks to prevent sharing
b.networks = make([]*structs.NetworkResource, len(task.Resources.Networks))
for i, n := range task.Resources.Networks {
b.networks[i] = n.Copy()
}
}
return b
}
// setAlloc is called from NewBuilder to populate alloc related environment
// variables.
func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder {
b.allocId = alloc.ID
b.allocName = alloc.Name
b.allocIndex = alloc.Index()
b.jobName = alloc.Job.Name
// Set meta
combined := alloc.Job.CombinedTaskMeta(alloc.TaskGroup, b.taskName)
b.taskMeta = make(map[string]string, len(combined)*2)
for k, v := range combined {
b.taskMeta[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v
b.taskMeta[fmt.Sprintf("%s%s", MetaPrefix, k)] = v
}
// Add ports from other tasks
b.otherPorts = make(map[string]string, len(alloc.TaskResources)*2)
for taskName, resources := range alloc.TaskResources {
if taskName == b.taskName {
continue
}
for _, nw := range resources.Networks {
for _, p := range nw.ReservedPorts {
addPort(b.otherPorts, taskName, nw.IP, p.Label, p.Value)
}
for _, p := range nw.DynamicPorts {
addPort(b.otherPorts, taskName, nw.IP, p.Label, p.Value)
}
}
}
return b
}
// setNode is called from NewBuilder to populate node attributes.
func (b *Builder) setNode(n *structs.Node) *Builder {
b.nodeAttrs = make(map[string]string, 4+len(n.Attributes)+len(n.Meta))
b.nodeAttrs[nodeIdKey] = n.ID
b.nodeAttrs[nodeNameKey] = n.Name
b.nodeAttrs[nodeClassKey] = n.NodeClass
b.nodeAttrs[nodeDcKey] = n.Datacenter
b.datacenter = n.Datacenter
// Set up the attributes.
for k, v := range n.Attributes {
b.nodeAttrs[fmt.Sprintf("%s%s", nodeAttributePrefix, k)] = v
}
// Set up the meta.
for k, v := range n.Meta {
b.nodeAttrs[fmt.Sprintf("%s%s", nodeMetaPrefix, k)] = v
}
return b
}
func (b *Builder) SetAllocDir(dir string) *Builder {
b.mu.Lock()
b.allocDir = dir
b.mu.Unlock()
return b
}
func (b *Builder) SetTaskLocalDir(dir string) *Builder {
b.mu.Lock()
b.localDir = dir
b.mu.Unlock()
return b
}
func (b *Builder) SetSecretsDir(dir string) *Builder {
b.mu.Lock()
b.secretsDir = dir
b.mu.Unlock()
return b
}
func (b *Builder) SetPortMap(portMap map[string]int) *Builder {
newPortMap := make(map[string]string, len(portMap))
for k, v := range portMap {
newPortMap[k] = strconv.Itoa(v)
}
b.mu.Lock()
b.portMap = newPortMap
b.mu.Unlock()
return b
}
// SetHostEnvvars adds the host environment variables to the tasks. The filter
// parameter can be use to filter host environment from entering the tasks.
func (b *Builder) SetHostEnvvars(filter []string) *Builder {
filterMap := make(map[string]struct{}, len(filter))
for _, f := range filter {
index[f] = struct{}{}
filterMap[f] = struct{}{}
}
for _, e := range hostEnv {
fullHostEnv := os.Environ()
filteredHostEnv := make(map[string]string, len(fullHostEnv))
for _, e := range fullHostEnv {
parts := strings.SplitN(e, "=", 2)
key, value := parts[0], parts[1]
// Skip filtered environment variables
if _, filtered := index[key]; filtered {
if _, filtered := filterMap[key]; filtered {
continue
}
// Don't override the tasks environment variables.
if _, existing := t.Env[key]; !existing {
t.Env[key] = value
}
filteredHostEnv[key] = value
}
return t
b.mu.Lock()
b.hostEnv = filteredHostEnv
b.mu.Unlock()
return b
}
func (t *TaskEnvironment) ClearEnvvars() *TaskEnvironment {
t.Env = nil
return t
func (b *Builder) SetTemplateEnv(m map[string]string) *Builder {
b.mu.Lock()
b.templateEnv = m
b.mu.Unlock()
return b
}
// Helper method for setting all fields from an allocation.
func (t *TaskEnvironment) SetAlloc(alloc *structs.Allocation) *TaskEnvironment {
t.AllocId = alloc.ID
t.AllocName = alloc.Name
t.AllocIndex = alloc.Index()
t.Alloc = alloc
return t
func (b *Builder) SetVaultToken(token string, inject bool) *Builder {
b.mu.Lock()
b.vaultToken = token
b.injectVaultToken = inject
b.mu.Unlock()
return b
}
// Helper method for clearing all fields from an allocation.
func (t *TaskEnvironment) ClearAlloc(alloc *structs.Allocation) *TaskEnvironment {
return t.ClearAllocId().ClearAllocName().ClearAllocIndex()
}
func (t *TaskEnvironment) SetAllocIndex(index int) *TaskEnvironment {
t.AllocIndex = index
return t
}
func (t *TaskEnvironment) ClearAllocIndex() *TaskEnvironment {
t.AllocIndex = -1
return t
}
func (t *TaskEnvironment) SetAllocId(id string) *TaskEnvironment {
t.AllocId = id
return t
}
func (t *TaskEnvironment) ClearAllocId() *TaskEnvironment {
t.AllocId = ""
return t
}
func (t *TaskEnvironment) SetAllocName(name string) *TaskEnvironment {
t.AllocName = name
return t
}
func (t *TaskEnvironment) ClearAllocName() *TaskEnvironment {
t.AllocName = ""
return t
}
func (t *TaskEnvironment) SetTaskName(name string) *TaskEnvironment {
t.TaskName = name
return t
}
func (t *TaskEnvironment) ClearTaskName() *TaskEnvironment {
t.TaskName = ""
return t
}
func (t *TaskEnvironment) SetJobName(name string) *TaskEnvironment {
t.JobName = name
return t
}
func (t *TaskEnvironment) ClearJobName() *TaskEnvironment {
t.JobName = ""
return t
}
func (t *TaskEnvironment) SetDatacenterName(name string) *TaskEnvironment {
t.Datacenter = name
return t
}
func (t *TaskEnvironment) ClearDatacenterName() *TaskEnvironment {
t.Datacenter = ""
return t
}
func (t *TaskEnvironment) SetRegionName(name string) *TaskEnvironment {
t.Region = name
return t
}
func (t *TaskEnvironment) ClearRegionName() *TaskEnvironment {
t.Region = ""
return t
}
func (t *TaskEnvironment) SetVaultToken(token string, inject bool) *TaskEnvironment {
t.VaultToken = token
t.InjectVaultToken = inject
return t
}
func (t *TaskEnvironment) ClearVaultToken() *TaskEnvironment {
t.VaultToken = ""
t.InjectVaultToken = false
return t
// addPort keys and values for other tasks to an env var map
func addPort(m map[string]string, taskName, ip, portLabel string, port int) {
key := fmt.Sprintf("%s%s_%s", AddrPrefix, taskName, portLabel)
m[key] = fmt.Sprintf("%s:%d", ip, port)
key = fmt.Sprintf("%s%s_%s", IpPrefix, taskName, portLabel)
m[key] = ip
key = fmt.Sprintf("%s%s_%s", PortPrefix, taskName, portLabel)
m[key] = strconv.Itoa(port)
}

View file

@ -29,20 +29,13 @@ const (
)
var (
// Networks that tests can rely on
networks = []*structs.NetworkResource{
&structs.NetworkResource{
IP: "127.0.0.1",
ReservedPorts: []structs.Port{{Label: "http", Value: 80}},
DynamicPorts: []structs.Port{{Label: "https", Value: 8080}},
},
}
// portMap for use in tests as its set after Builder creation
portMap = map[string]int{
"https": 443,
}
)
func testTaskEnvironment() *TaskEnvironment {
func testEnvBuilder() *Builder {
n := mock.Node()
n.Attributes = map[string]string{
attrKey: attrVal,
@ -53,18 +46,19 @@ func testTaskEnvironment() *TaskEnvironment {
n.Name = nodeName
n.NodeClass = nodeClass
envvars := map[string]string{
task := mock.Job().TaskGroups[0].Tasks[0]
task.Env = map[string]string{
envOneKey: envOneVal,
envTwoKey: envTwoVal,
}
return NewTaskEnvironment(n).SetEnvvars(envvars).Build()
return NewBuilder(n, mock.Alloc(), task, "global")
}
func TestEnvironment_ParseAndReplace_Env(t *testing.T) {
env := testTaskEnvironment()
env := testEnvBuilder()
input := []string{fmt.Sprintf(`"${%v}"!`, envOneKey), fmt.Sprintf("${%s}${%s}", envOneKey, envTwoKey)}
act := env.ParseAndReplace(input)
act := env.Build().ParseAndReplace(input)
exp := []string{fmt.Sprintf(`"%s"!`, envOneVal), fmt.Sprintf("%s%s", envOneVal, envTwoVal)}
if !reflect.DeepEqual(act, exp) {
@ -75,8 +69,8 @@ func TestEnvironment_ParseAndReplace_Env(t *testing.T) {
func TestEnvironment_ParseAndReplace_Meta(t *testing.T) {
input := []string{fmt.Sprintf("${%v%v}", nodeMetaPrefix, metaKey)}
exp := []string{metaVal}
env := testTaskEnvironment()
act := env.ParseAndReplace(input)
env := testEnvBuilder()
act := env.Build().ParseAndReplace(input)
if !reflect.DeepEqual(act, exp) {
t.Fatalf("ParseAndReplace(%v) returned %#v; want %#v", input, act, exp)
@ -86,8 +80,8 @@ func TestEnvironment_ParseAndReplace_Meta(t *testing.T) {
func TestEnvironment_ParseAndReplace_Attr(t *testing.T) {
input := []string{fmt.Sprintf("${%v%v}", nodeAttributePrefix, attrKey)}
exp := []string{attrVal}
env := testTaskEnvironment()
act := env.ParseAndReplace(input)
env := testEnvBuilder()
act := env.Build().ParseAndReplace(input)
if !reflect.DeepEqual(act, exp) {
t.Fatalf("ParseAndReplace(%v) returned %#v; want %#v", input, act, exp)
@ -97,8 +91,8 @@ func TestEnvironment_ParseAndReplace_Attr(t *testing.T) {
func TestEnvironment_ParseAndReplace_Node(t *testing.T) {
input := []string{fmt.Sprintf("${%v}", nodeNameKey), fmt.Sprintf("${%v}", nodeClassKey)}
exp := []string{nodeName, nodeClass}
env := testTaskEnvironment()
act := env.ParseAndReplace(input)
env := testEnvBuilder()
act := env.Build().ParseAndReplace(input)
if !reflect.DeepEqual(act, exp) {
t.Fatalf("ParseAndReplace(%v) returned %#v; want %#v", input, act, exp)
@ -116,8 +110,8 @@ func TestEnvironment_ParseAndReplace_Mixed(t *testing.T) {
fmt.Sprintf("%v%v", nodeClass, metaVal),
fmt.Sprintf("%v%v", envTwoVal, nodeClass),
}
env := testTaskEnvironment()
act := env.ParseAndReplace(input)
env := testEnvBuilder()
act := env.Build().ParseAndReplace(input)
if !reflect.DeepEqual(act, exp) {
t.Fatalf("ParseAndReplace(%v) returned %#v; want %#v", input, act, exp)
@ -127,8 +121,8 @@ func TestEnvironment_ParseAndReplace_Mixed(t *testing.T) {
func TestEnvironment_ReplaceEnv_Mixed(t *testing.T) {
input := fmt.Sprintf("${%v}${%v%v}", nodeNameKey, nodeAttributePrefix, attrKey)
exp := fmt.Sprintf("%v%v", nodeName, attrVal)
env := testTaskEnvironment()
act := env.ReplaceEnv(input)
env := testEnvBuilder()
act := env.Build().ReplaceEnv(input)
if act != exp {
t.Fatalf("ParseAndReplace(%v) returned %#v; want %#v", input, act, exp)
@ -137,6 +131,9 @@ func TestEnvironment_ReplaceEnv_Mixed(t *testing.T) {
func TestEnvironment_AsList(t *testing.T) {
n := mock.Node()
n.Meta = map[string]string{
"metaKey": "metaVal",
}
a := mock.Alloc()
a.Resources.Networks[0].ReservedPorts = append(a.Resources.Networks[0].ReservedPorts,
structs.Port{Label: "ssh", Value: 22},
@ -156,15 +153,22 @@ func TestEnvironment_AsList(t *testing.T) {
},
},
}
env := NewTaskEnvironment(n).
SetNetworks(networks).
SetPortMap(portMap).
SetTaskMeta(map[string]string{"foo": "baz"}).
SetAlloc(a).
SetTaskName("taskA").Build()
task := a.Job.TaskGroups[0].Tasks[0]
task.Env = map[string]string{
"taskEnvKey": "taskEnvVal",
}
task.Resources.Networks = []*structs.NetworkResource{
&structs.NetworkResource{
IP: "127.0.0.1",
ReservedPorts: []structs.Port{{Label: "http", Value: 80}},
DynamicPorts: []structs.Port{{Label: "https", Value: 8080}},
},
}
env := NewBuilder(n, a, task, "global").SetPortMap(map[string]int{"https": 443})
act := env.EnvList()
act := env.Build().List()
exp := []string{
"taskEnvKey=taskEnvVal",
"NOMAD_ADDR_http=127.0.0.1:80",
"NOMAD_PORT_http=80",
"NOMAD_IP_http=127.0.0.1",
@ -173,110 +177,112 @@ func TestEnvironment_AsList(t *testing.T) {
"NOMAD_IP_https=127.0.0.1",
"NOMAD_HOST_PORT_http=80",
"NOMAD_HOST_PORT_https=8080",
"NOMAD_META_FOO=baz",
"NOMAD_META_foo=baz",
"NOMAD_ADDR_web_main=192.168.0.100:5000",
"NOMAD_ADDR_web_http=192.168.0.100:2000",
"NOMAD_PORT_web_main=5000",
"NOMAD_PORT_web_http=2000",
"NOMAD_IP_web_main=192.168.0.100",
"NOMAD_IP_web_http=192.168.0.100",
"NOMAD_TASK_NAME=taskA",
"NOMAD_TASK_NAME=web",
"NOMAD_ADDR_ssh_other=192.168.0.100:1234",
"NOMAD_ADDR_ssh_ssh=192.168.0.100:22",
"NOMAD_IP_ssh_other=192.168.0.100",
"NOMAD_IP_ssh_ssh=192.168.0.100",
"NOMAD_PORT_ssh_other=1234",
"NOMAD_PORT_ssh_ssh=22",
"NOMAD_CPU_LIMIT=500",
"NOMAD_DC=dc1",
"NOMAD_REGION=global",
"NOMAD_MEMORY_LIMIT=256",
"NOMAD_META_ELB_CHECK_INTERVAL=30s",
"NOMAD_META_ELB_CHECK_MIN=3",
"NOMAD_META_ELB_CHECK_TYPE=http",
"NOMAD_META_FOO=bar",
"NOMAD_META_OWNER=armon",
"NOMAD_META_elb_check_interval=30s",
"NOMAD_META_elb_check_min=3",
"NOMAD_META_elb_check_type=http",
"NOMAD_META_foo=bar",
"NOMAD_META_owner=armon",
"NOMAD_JOB_NAME=my-job",
fmt.Sprintf("NOMAD_ALLOC_ID=%s", a.ID),
}
sort.Strings(act)
sort.Strings(exp)
if len(act) != len(exp) {
t.Fatalf("wat: %d != %d", len(act), len(exp))
t.Fatalf("expected %d vars != %d actual, actual: %s\n\nexpected: %s\n",
len(act), len(exp), strings.Join(act, "\n"), strings.Join(exp, "\n"))
}
for i := range act {
if act[i] != exp[i] {
t.Errorf("%d %q != %q", i, act[i], exp[i])
t.Errorf("%d actual %q != %q expected", i, act[i], exp[i])
}
}
}
func TestEnvironment_VaultToken(t *testing.T) {
n := mock.Node()
env := NewTaskEnvironment(n).SetVaultToken("123", false).Build()
a := mock.Alloc()
env := NewBuilder(n, a, a.Job.TaskGroups[0].Tasks[0], "global")
env.SetVaultToken("123", false)
act := env.EnvList()
if len(act) != 0 {
t.Fatalf("Unexpected environment variables: %v", act)
{
act := env.Build().All()
if act[VaultToken] != "" {
t.Fatalf("Unexpected environment variables: %s=%q", VaultToken, act[VaultToken])
}
}
env = env.SetVaultToken("123", true).Build()
act = env.EnvList()
exp := []string{"VAULT_TOKEN=123"}
if !reflect.DeepEqual(act, exp) {
t.Fatalf("env.List() returned %v; want %v", act, exp)
{
act := env.SetVaultToken("123", true).Build().List()
exp := "VAULT_TOKEN=123"
found := false
for _, entry := range act {
if entry == exp {
found = true
break
}
}
if !found {
t.Fatalf("did not find %q in:\n%s", exp, strings.Join(act, "\n"))
}
}
}
func TestEnvironment_ClearEnvvars(t *testing.T) {
func TestEnvironment_Envvars(t *testing.T) {
envMap := map[string]string{"foo": "baz", "bar": "bang"}
n := mock.Node()
env := NewTaskEnvironment(n).
SetNetworks(networks).
SetPortMap(portMap).
SetEnvvars(map[string]string{"foo": "baz", "bar": "bang"}).Build()
act := env.EnvList()
exp := []string{
"NOMAD_ADDR_http=127.0.0.1:80",
"NOMAD_PORT_http=80",
"NOMAD_IP_http=127.0.0.1",
"NOMAD_ADDR_https=127.0.0.1:443",
"NOMAD_PORT_https=443",
"NOMAD_IP_https=127.0.0.1",
"NOMAD_HOST_PORT_http=80",
"NOMAD_HOST_PORT_https=8080",
"bar=bang",
"foo=baz",
}
sort.Strings(act)
sort.Strings(exp)
if !reflect.DeepEqual(act, exp) {
t.Fatalf("env.List() returned %v; want %v", act, exp)
}
// Clear the environent variables.
env.ClearEnvvars().Build()
act = env.EnvList()
exp = []string{
"NOMAD_ADDR_http=127.0.0.1:80",
"NOMAD_PORT_http=80",
"NOMAD_IP_http=127.0.0.1",
"NOMAD_ADDR_https=127.0.0.1:443",
"NOMAD_PORT_https=443",
"NOMAD_IP_https=127.0.0.1",
"NOMAD_HOST_PORT_https=8080",
"NOMAD_HOST_PORT_http=80",
}
sort.Strings(act)
sort.Strings(exp)
if !reflect.DeepEqual(act, exp) {
t.Fatalf("env.List() returned %v; want %v", act, exp)
a := mock.Alloc()
task := a.Job.TaskGroups[0].Tasks[0]
task.Env = envMap
act := NewBuilder(n, a, task, "global").SetPortMap(portMap).Build().All()
for k, v := range envMap {
actV, ok := act[k]
if !ok {
t.Fatalf("missing %q in %#v", k, act)
}
if v != actV {
t.Fatalf("expected %s=%q but found %q", k, v, actV)
}
}
}
func TestEnvironment_Interpolate(t *testing.T) {
env := testTaskEnvironment().
SetEnvvars(map[string]string{"test": "${node.class}", "test2": "${attr.arch}"}).
Build()
n := mock.Node()
n.Attributes["arch"] = "x86"
n.NodeClass = "test class"
a := mock.Alloc()
task := a.Job.TaskGroups[0].Tasks[0]
task.Env = map[string]string{"test": "${node.class}", "test2": "${attr.arch}"}
env := NewBuilder(n, a, task, "global").Build()
act := env.EnvList()
exp := []string{fmt.Sprintf("test=%s", nodeClass), fmt.Sprintf("test2=%s", attrVal)}
sort.Strings(act)
sort.Strings(exp)
if !reflect.DeepEqual(act, exp) {
t.Fatalf("env.List() returned %v; want %v", act, exp)
exp := []string{fmt.Sprintf("test=%s", n.NodeClass), fmt.Sprintf("test2=%s", n.Attributes["arch"])}
found1, found2 := false, false
for _, entry := range env.List() {
switch entry {
case exp[0]:
found1 = true
case exp[1]:
found2 = true
}
}
if !found1 || !found2 {
t.Fatalf("expected to find %q and %q but got:\n%s",
exp[0], exp[1], strings.Join(env.List(), "\n"))
}
}
@ -286,11 +292,11 @@ func TestEnvironment_AppendHostEnvvars(t *testing.T) {
t.Skip("No host environment variables. Can't test")
}
skip := strings.Split(host[0], "=")[0]
env := testTaskEnvironment().
AppendHostEnvvars([]string{skip}).
env := testEnvBuilder().
SetHostEnvvars([]string{skip}).
Build()
act := env.EnvMap()
act := env.Map()
if len(act) < 1 {
t.Fatalf("Host environment variables not properly set")
}
@ -303,21 +309,60 @@ func TestEnvironment_AppendHostEnvvars(t *testing.T) {
// converted to underscores in environment variables.
// See: https://github.com/hashicorp/nomad/issues/2405
func TestEnvironment_DashesInTaskName(t *testing.T) {
env := testTaskEnvironment()
env.SetNetworks([]*structs.NetworkResource{
{
Device: "eth0",
DynamicPorts: []structs.Port{
{
Label: "just-some-dashes",
Value: 9000,
},
},
},
})
env.Build()
a := mock.Alloc()
task := a.Job.TaskGroups[0].Tasks[0]
task.Env = map[string]string{"test-one-two": "three-four"}
envMap := NewBuilder(mock.Node(), a, task, "global").Build().Map()
if env.TaskEnv["NOMAD_PORT_just_some_dashes"] != "9000" {
t.Fatalf("Expected NOMAD_PORT_just_some_dashes=9000 in TaskEnv; found:\n%#v", env.TaskEnv)
if envMap["test_one_two"] != "three-four" {
t.Fatalf("Expected test_one_two=three-four in TaskEnv; found:\n%#v", envMap)
}
}
// TestEnvironment_UpdateTask asserts env vars and task meta are updated when a
// task is updated.
func TestEnvironment_UpdateTask(t *testing.T) {
a := mock.Alloc()
a.Job.TaskGroups[0].Meta = map[string]string{"tgmeta": "tgmetaval"}
task := a.Job.TaskGroups[0].Tasks[0]
task.Name = "orig"
task.Env = map[string]string{"taskenv": "taskenvval"}
task.Meta = map[string]string{"taskmeta": "taskmetaval"}
builder := NewBuilder(mock.Node(), a, task, "global")
origMap := builder.Build().Map()
if origMap["NOMAD_TASK_NAME"] != "orig" {
t.Errorf("Expected NOMAD_TASK_NAME=orig but found %q", origMap["NOMAD_TASK_NAME"])
}
if origMap["NOMAD_META_taskmeta"] != "taskmetaval" {
t.Errorf("Expected NOMAD_META_taskmeta=taskmetaval but found %q", origMap["NOMAD_META_taskmeta"])
}
if origMap["taskenv"] != "taskenvval" {
t.Errorf("Expected taskenv=taskenvva but found %q", origMap["taskenv"])
}
if origMap["NOMAD_META_tgmeta"] != "tgmetaval" {
t.Errorf("Expected NOMAD_META_tgmeta=tgmetaval but found %q", origMap["NOMAD_META_tgmeta"])
}
a.Job.TaskGroups[0].Meta = map[string]string{"tgmeta2": "tgmetaval2"}
task.Name = "new"
task.Env = map[string]string{"taskenv2": "taskenvval2"}
task.Meta = map[string]string{"taskmeta2": "taskmetaval2"}
newMap := builder.UpdateTask(a, task).Build().Map()
if newMap["NOMAD_TASK_NAME"] != "new" {
t.Errorf("Expected NOMAD_TASK_NAME=new but found %q", newMap["NOMAD_TASK_NAME"])
}
if newMap["NOMAD_META_taskmeta2"] != "taskmetaval2" {
t.Errorf("Expected NOMAD_META_taskmeta=taskmetaval but found %q", newMap["NOMAD_META_taskmeta2"])
}
if newMap["taskenv2"] != "taskenvval2" {
t.Errorf("Expected taskenv=taskenvva but found %q", newMap["taskenv2"])
}
if newMap["NOMAD_META_tgmeta2"] != "tgmetaval2" {
t.Errorf("Expected NOMAD_META_tgmeta=tgmetaval but found %q", newMap["NOMAD_META_tgmeta2"])
}
if v, ok := newMap["NOMAD_META_taskmeta"]; ok {
t.Errorf("Expected NOMAD_META_taskmeta to be unset but found: %q", v)
}
}

View file

@ -98,7 +98,7 @@ func (d *ExecDriver) Periodic() (bool, time.Duration) {
return true, 15 * time.Second
}
func (d *ExecDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) {
func (d *ExecDriver) Prestart(*ExecContext, *structs.Task) (*PrestartResponse, error) {
return nil, nil
}
@ -124,7 +124,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, err
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
TaskEnv: ctx.TaskEnv,
Driver: "exec",
AllocID: d.DriverContext.allocID,
LogDir: ctx.TaskDir.LogDir,

View file

@ -66,7 +66,7 @@ type Executor interface {
// wants to run and isolate it
type ExecutorContext struct {
// TaskEnv holds information about the environment of a Task
TaskEnv *env.TaskEnvironment
TaskEnv *env.TaskEnv
// Task is the task whose executor is being launched
Task *structs.Task
@ -229,7 +229,6 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand) (*ProcessState, erro
// set the task dir as the working directory for the command
e.cmd.Dir = e.ctx.TaskDir
e.ctx.TaskEnv.Build()
// configuring the chroot, resource container, and start the plugin
// process in the chroot.
if err := e.configureIsolation(); err != nil {
@ -274,7 +273,7 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand) (*ProcessState, erro
// Set the commands arguments
e.cmd.Path = path
e.cmd.Args = append([]string{e.cmd.Path}, e.ctx.TaskEnv.ParseAndReplace(command.Args)...)
e.cmd.Env = e.ctx.TaskEnv.EnvList()
e.cmd.Env = e.ctx.TaskEnv.List()
// Start the process
if err := e.cmd.Start(); err != nil {
@ -295,7 +294,7 @@ func (e *UniversalExecutor) Exec(deadline time.Time, name string, args []string)
// ExecScript executes cmd with args and returns the output, exit code, and
// error. Output is truncated to client/driver/structs.CheckBufSize
func ExecScript(ctx context.Context, dir string, env *env.TaskEnvironment, attrs *syscall.SysProcAttr,
func ExecScript(ctx context.Context, dir string, env *env.TaskEnv, attrs *syscall.SysProcAttr,
name string, args []string) ([]byte, int, error) {
name = env.ReplaceEnv(name)
cmd := exec.CommandContext(ctx, name, env.ParseAndReplace(args)...)
@ -303,7 +302,7 @@ func ExecScript(ctx context.Context, dir string, env *env.TaskEnvironment, attrs
// Copy runtime environment from the main command
cmd.SysProcAttr = attrs
cmd.Dir = dir
cmd.Env = env.EnvList()
cmd.Env = env.List()
// Capture output
buf, _ := circbuf.NewBuffer(int64(dstructs.CheckBufSize))

View file

@ -37,9 +37,9 @@ func testExecutorContextWithChroot(t *testing.T) (*ExecutorContext, *allocdir.Al
"/foobar": "/does/not/exist",
}
taskEnv := env.NewTaskEnvironment(mock.Node())
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
taskEnv := env.NewBuilder(mock.Node(), alloc, task, "global").Build()
allocDir := allocdir.NewAllocDir(testLogger(), filepath.Join(os.TempDir(), alloc.ID))
if err := allocDir.Build(); err != nil {

View file

@ -40,9 +40,9 @@ func testLogger() *log.Logger {
//
// The caller is responsible for calling AllocDir.Destroy() to cleanup.
func testExecutorContext(t *testing.T) (*ExecutorContext, *allocdir.AllocDir) {
taskEnv := env.NewTaskEnvironment(mock.Node())
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
taskEnv := env.NewBuilder(mock.Node(), alloc, task, "global").Build()
allocDir := allocdir.NewAllocDir(testLogger(), filepath.Join(os.TempDir(), alloc.ID))
if err := allocDir.Build(); err != nil {

View file

@ -175,11 +175,11 @@ func (d *JavaDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool,
return true, nil
}
func (d *JavaDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) {
func (d *JavaDriver) Prestart(*ExecContext, *structs.Task) (*PrestartResponse, error) {
return nil, nil
}
func NewJavaDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*JavaDriverConfig, error) {
func NewJavaDriverConfig(task *structs.Task, env *env.TaskEnv) (*JavaDriverConfig, error) {
var driverConfig JavaDriverConfig
if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil {
return nil, err
@ -203,7 +203,7 @@ func NewJavaDriverConfig(task *structs.Task, env *env.TaskEnvironment) (*JavaDri
}
func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
driverConfig, err := NewJavaDriverConfig(task, d.taskEnv)
driverConfig, err := NewJavaDriverConfig(task, ctx.TaskEnv)
if err != nil {
return nil, err
}
@ -249,7 +249,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
// Set the context
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
TaskEnv: ctx.TaskEnv,
Driver: "java",
AllocID: d.DriverContext.allocID,
Task: task,

View file

@ -173,7 +173,7 @@ func (d *LxcDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, e
return true, nil
}
func (d *LxcDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) {
func (d *LxcDriver) Prestart(*ExecContext, *structs.Task) (*PrestartResponse, error) {
return nil, nil
}

View file

@ -1,4 +1,4 @@
// +build nomad_test
//+build nomad_test
package driver
@ -84,7 +84,7 @@ func (d *MockDriver) FSIsolation() cstructs.FSIsolation {
return cstructs.FSIsolationNone
}
func (d *MockDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) {
func (d *MockDriver) Prestart(*ExecContext, *structs.Task) (*PrestartResponse, error) {
return nil, nil
}

View file

@ -131,7 +131,7 @@ func (d *QemuDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool,
return true, nil
}
func (d *QemuDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) {
func (d *QemuDriver) Prestart(*ExecContext, *structs.Task) (*PrestartResponse, error) {
return nil, nil
}
@ -238,7 +238,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, err
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
TaskEnv: ctx.TaskEnv,
Driver: "qemu",
AllocID: d.DriverContext.allocID,
Task: task,

View file

@ -50,7 +50,7 @@ type rawExecHandle struct {
logger *log.Logger
waitCh chan *dstructs.WaitResult
doneCh chan struct{}
taskEnv *env.TaskEnvironment
taskEnv *env.TaskEnv
taskDir *allocdir.TaskDir
}
@ -106,7 +106,7 @@ func (d *RawExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (boo
return false, nil
}
func (d *RawExecDriver) Prestart(*ExecContext, *structs.Task) (*CreatedResources, error) {
func (d *RawExecDriver) Prestart(*ExecContext, *structs.Task) (*PrestartResponse, error) {
return nil, nil
}
@ -133,7 +133,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
return nil, err
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
TaskEnv: ctx.TaskEnv,
Driver: "raw_exec",
AllocID: d.DriverContext.allocID,
Task: task,
@ -169,7 +169,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
logger: d.logger,
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
taskEnv: d.taskEnv,
taskEnv: ctx.TaskEnv,
taskDir: ctx.TaskDir,
}
go h.run()
@ -218,7 +218,7 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e
version: id.Version,
doneCh: make(chan struct{}),
waitCh: make(chan *dstructs.WaitResult, 1),
taskEnv: d.taskEnv,
taskEnv: ctx.TaskEnv,
taskDir: ctx.TaskDir,
}
go h.run()

View file

@ -88,7 +88,7 @@ type RktDriverConfig struct {
// rktHandle is returned from Start/Open as a handle to the PID
type rktHandle struct {
uuid string
env *env.TaskEnvironment
env *env.TaskEnv
taskDir *allocdir.TaskDir
pluginClient *plugin.Client
executorPid int
@ -223,7 +223,7 @@ func (d *RktDriver) Periodic() (bool, time.Duration) {
return true, 15 * time.Second
}
func (d *RktDriver) Prestart(ctx *ExecContext, task *structs.Task) (*CreatedResources, error) {
func (d *RktDriver) Prestart(ctx *ExecContext, task *structs.Task) (*PrestartResponse, error) {
return nil, nil
}
@ -240,7 +240,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
img := driverConfig.ImageName
// Build the command.
cmdArgs := make([]string, 0, 32)
cmdArgs := make([]string, 0, 50)
// Add debug option to rkt command.
debug := driverConfig.Debug
@ -310,7 +310,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
cmdArgs = append(cmdArgs, fmt.Sprintf("--debug=%t", debug))
// Inject environment variables
for k, v := range d.taskEnv.EnvMap() {
for k, v := range ctx.TaskEnv.Map() {
cmdArgs = append(cmdArgs, fmt.Sprintf("--set-env=%v=%q", k, v))
}
@ -400,7 +400,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
// Add user passed arguments.
if len(driverConfig.Args) != 0 {
parsed := d.taskEnv.ParseAndReplace(driverConfig.Args)
parsed := ctx.TaskEnv.ParseAndReplace(driverConfig.Args)
// Need to start arguments with "--"
if len(parsed) > 0 {
@ -412,10 +412,6 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
}
}
// Set the host environment variables.
filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",")
d.taskEnv.AppendHostEnvvars(filter)
pluginLogFile := filepath.Join(ctx.TaskDir.Dir, fmt.Sprintf("%s-executor.out", task.Name))
executorConfig := &dstructs.ExecutorConfig{
LogFile: pluginLogFile,
@ -426,8 +422,14 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
if err != nil {
return nil, err
}
// The task's environment is set via --set-env flags above, but the rkt
// command itself needs an evironment with PATH set to find iptables.
eb := env.NewEmptyBuilder()
filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",")
rktEnv := eb.SetHostEnvvars(filter).Build()
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
TaskEnv: rktEnv,
Driver: "rkt",
AllocID: d.DriverContext.allocID,
Task: task,
@ -477,7 +479,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
maxKill := d.DriverContext.config.MaxKillTimeout
h := &rktHandle{
uuid: uuid,
env: d.taskEnv,
env: rktEnv,
taskDir: ctx.TaskDir,
pluginClient: pluginClient,
executor: execIntf,
@ -514,12 +516,18 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error
return nil, fmt.Errorf("error connecting to plugin: %v", err)
}
// The task's environment is set via --set-env flags in Start, but the rkt
// command itself needs an evironment with PATH set to find iptables.
eb := env.NewEmptyBuilder()
filter := strings.Split(d.config.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",")
rktEnv := eb.SetHostEnvvars(filter).Build()
ver, _ := exec.Version()
d.logger.Printf("[DEBUG] driver.rkt: version of executor: %v", ver.Version)
// Return a driver handle
h := &rktHandle{
uuid: id.UUID,
env: d.taskEnv,
env: rktEnv,
taskDir: ctx.TaskDir,
pluginClient: pluginClient,
executorPid: id.ExecutorPid,

View file

@ -12,9 +12,12 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/env"
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
dstructs "github.com/hashicorp/nomad/client/driver/structs"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -29,7 +32,7 @@ func cgroupsMounted(node *structs.Node) bool {
// createExecutor launches an executor plugin and returns an instance of the
// Executor interface
func createExecutor(w io.Writer, clientConfig *config.Config,
executorConfig *cstructs.ExecutorConfig) (executor.Executor, *plugin.Client, error) {
executorConfig *dstructs.ExecutorConfig) (executor.Executor, *plugin.Client, error) {
c, err := json.Marshal(executorConfig)
if err != nil {
@ -171,10 +174,33 @@ func GetAbsolutePath(bin string) (string, error) {
}
// getExecutorUser returns the user of the task, defaulting to
// cstructs.DefaultUnprivilegedUser if none was given.
// dstructs.DefaultUnprivilegedUser if none was given.
func getExecutorUser(task *structs.Task) string {
if task.User == "" {
return cstructs.DefaultUnpriviledgedUser
return dstructs.DefaultUnpriviledgedUser
}
return task.User
}
// SetEnvvars sets path and host env vars depending on the FS isolation used.
func SetEnvvars(envBuilder *env.Builder, fsi cstructs.FSIsolation, taskDir *allocdir.TaskDir, conf *config.Config) {
// Set driver-specific environment variables
switch fsi {
case cstructs.FSIsolationNone:
// Use host paths
envBuilder.SetAllocDir(taskDir.SharedAllocDir)
envBuilder.SetTaskLocalDir(taskDir.LocalDir)
envBuilder.SetSecretsDir(taskDir.SecretsDir)
default:
// filesystem isolation; use container paths
envBuilder.SetAllocDir(allocdir.SharedAllocContainerPath)
envBuilder.SetTaskLocalDir(allocdir.TaskLocalContainerPath)
envBuilder.SetSecretsDir(allocdir.TaskSecretsContainerPath)
}
// Set the host environment variables for non-image based drivers
if fsi != cstructs.FSIsolationImage {
filter := strings.Split(conf.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",")
envBuilder.SetHostEnvvars(filter)
}
}

View file

@ -8,7 +8,6 @@ import (
"sync"
gg "github.com/hashicorp/go-getter"
"github.com/hashicorp/nomad/client/driver/env"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -27,6 +26,12 @@ const (
gitSSHPrefix = "git@github.com:"
)
// EnvReplacer is an interface which can interpolate environment variables and
// is usually satisfied by env.TaskEnv.
type EnvReplacer interface {
ReplaceEnv(string) string
}
// getClient returns a client that is suitable for Nomad downloading artifacts.
func getClient(src, dst string) *gg.Client {
lock.Lock()
@ -51,8 +56,7 @@ func getClient(src, dst string) *gg.Client {
}
// getGetterUrl returns the go-getter URL to download the artifact.
func getGetterUrl(taskEnv *env.TaskEnvironment, artifact *structs.TaskArtifact) (string, error) {
taskEnv.Build()
func getGetterUrl(taskEnv EnvReplacer, artifact *structs.TaskArtifact) (string, error) {
source := taskEnv.ReplaceEnv(artifact.GetterSource)
// Handle an invalid URL when given a go-getter url such as
@ -85,7 +89,7 @@ func getGetterUrl(taskEnv *env.TaskEnvironment, artifact *structs.TaskArtifact)
}
// GetArtifact downloads an artifact into the specified task directory.
func GetArtifact(taskEnv *env.TaskEnvironment, artifact *structs.TaskArtifact, taskDir string) error {
func GetArtifact(taskEnv EnvReplacer, artifact *structs.TaskArtifact, taskDir string) error {
url, err := getGetterUrl(taskEnv, artifact)
if err != nil {
return newGetError(artifact.GetterSource, err, false)

View file

@ -16,6 +16,15 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
// fakeReplacer is a noop version of env.TaskEnv.ReplanceEnv
type fakeReplacer struct{}
func (fakeReplacer) ReplaceEnv(s string) string {
return s
}
var taskEnv = fakeReplacer{}
func TestGetArtifact_FileAndChecksum(t *testing.T) {
// Create the test server hosting the file to download
ts := httptest.NewServer(http.FileServer(http.Dir(filepath.Dir("./test-fixtures/"))))
@ -38,7 +47,6 @@ func TestGetArtifact_FileAndChecksum(t *testing.T) {
}
// Download the artifact
taskEnv := env.NewTaskEnvironment(mock.Node())
if err := GetArtifact(taskEnv, artifact, taskDir); err != nil {
t.Fatalf("GetArtifact failed: %v", err)
}
@ -73,7 +81,6 @@ func TestGetArtifact_File_RelativeDest(t *testing.T) {
}
// Download the artifact
taskEnv := env.NewTaskEnvironment(mock.Node())
if err := GetArtifact(taskEnv, artifact, taskDir); err != nil {
t.Fatalf("GetArtifact failed: %v", err)
}
@ -91,7 +98,11 @@ func TestGetGetterUrl_Interprolation(t *testing.T) {
}
url := "foo.com"
taskEnv := env.NewTaskEnvironment(mock.Node()).SetTaskMeta(map[string]string{"artifact": url})
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Meta = map[string]string{"artifact": url}
taskEnv := env.NewBuilder(mock.Node(), alloc, task, "global").Build()
act, err := getGetterUrl(taskEnv, artifact)
if err != nil {
t.Fatalf("getGetterUrl() failed: %v", err)
@ -124,7 +135,6 @@ func TestGetArtifact_InvalidChecksum(t *testing.T) {
}
// Download the artifact and expect an error
taskEnv := env.NewTaskEnvironment(mock.Node())
if err := GetArtifact(taskEnv, artifact, taskDir); err == nil {
t.Fatalf("GetArtifact should have failed")
}
@ -190,7 +200,6 @@ func TestGetArtifact_Archive(t *testing.T) {
},
}
taskEnv := env.NewTaskEnvironment(mock.Node())
if err := GetArtifact(taskEnv, artifact, taskDir); err != nil {
t.Fatalf("GetArtifact failed: %v", err)
}
@ -206,7 +215,6 @@ func TestGetArtifact_Archive(t *testing.T) {
}
func TestGetGetterUrl_Queries(t *testing.T) {
taskEnv := env.NewTaskEnvironment(mock.Node())
cases := []struct {
name string
artifact *structs.TaskArtifact

View file

@ -85,9 +85,8 @@ type TaskRunner struct {
task *structs.Task
taskDir *allocdir.TaskDir
// taskEnv is the environment variables of the task
taskEnv *env.TaskEnvironment
taskEnvLock sync.Mutex
// envBuilder is used to build the task's environment
envBuilder *env.Builder
// updateCh is used to receive updated versions of the allocation
updateCh chan *structs.Allocation
@ -215,6 +214,9 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
}
restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type)
// Initialize the environment builder
envBuilder := env.NewBuilder(config.Node, alloc, task, config.Region)
tc := &TaskRunner{
config: config,
stateDB: stateDB,
@ -224,6 +226,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
alloc: alloc,
task: task,
taskDir: taskDir,
envBuilder: envBuilder,
createdResources: driver.NewCreatedResources(),
consul: consulClient,
vaultClient: vaultClient,
@ -310,11 +313,6 @@ func (r *TaskRunner) RestoreState() (string, error) {
r.payloadRendered = snap.PayloadRendered
r.setCreatedResources(snap.CreatedResources)
if err := r.setTaskEnv(); err != nil {
return "", fmt.Errorf("client: failed to create task environment for task %q in allocation %q: %v",
r.task.Name, r.alloc.ID, err)
}
if r.task.Vault != nil {
// Read the token from the secret directory
tokenPath := filepath.Join(r.taskDir.SecretsDir, vaultTokenFile)
@ -339,7 +337,7 @@ func (r *TaskRunner) RestoreState() (string, error) {
return "", err
}
ctx := driver.NewExecContext(r.taskDir)
ctx := driver.NewExecContext(r.taskDir, r.envBuilder.Build())
handle, err := d.Open(ctx, snap.HandleID)
// In the case it fails, we relaunch the task in the Run() method.
@ -480,35 +478,8 @@ func (r *TaskRunner) setState(state string, event *structs.TaskEvent) {
r.updater(r.task.Name, state, event)
}
// setTaskEnv sets the task environment. It returns an error if it could not be
// created.
func (r *TaskRunner) setTaskEnv() error {
r.taskEnvLock.Lock()
defer r.taskEnvLock.Unlock()
taskEnv, err := driver.GetTaskEnv(r.taskDir, r.config.Node,
r.task.Copy(), r.alloc, r.config, r.vaultFuture.Get())
if err != nil {
return err
}
r.taskEnv = taskEnv
return nil
}
// getTaskEnv returns the task environment
func (r *TaskRunner) getTaskEnv() *env.TaskEnvironment {
r.taskEnvLock.Lock()
defer r.taskEnvLock.Unlock()
return r.taskEnv
}
// createDriver makes a driver for the task
func (r *TaskRunner) createDriver() (driver.Driver, error) {
env := r.getTaskEnv()
if env == nil {
return nil, fmt.Errorf("task environment not made for task %q in allocation %q", r.task.Name, r.alloc.ID)
}
// Create a task-specific event emitter callback to expose minimal
// state to drivers
eventEmitter := func(m string, args ...interface{}) {
@ -517,13 +488,14 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) {
r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg))
}
driverCtx := driver.NewDriverContext(r.task.Name, r.alloc.ID, r.config, r.config.Node, r.logger, env, eventEmitter)
driver, err := driver.NewDriver(r.task.Driver, driverCtx)
driverCtx := driver.NewDriverContext(r.task.Name, r.alloc.ID, r.config, r.config.Node, r.logger, eventEmitter)
d, err := driver.NewDriver(r.task.Driver, driverCtx)
if err != nil {
return nil, fmt.Errorf("failed to create driver '%s' for alloc %s: %v",
r.task.Driver, r.alloc.ID, err)
}
return driver, err
return d, err
}
// Run is a long running routine used to manage the task
@ -532,15 +504,6 @@ func (r *TaskRunner) Run() {
r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')",
r.task.Name, r.alloc.ID)
// Create the initial environment, this will be recreated if a Vault token
// is needed
if err := r.setTaskEnv(); err != nil {
r.setState(
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err))
return
}
if err := r.validateTask(); err != nil {
r.setState(
structs.TaskStateDead,
@ -548,8 +511,10 @@ func (r *TaskRunner) Run() {
return
}
// Create a driver so that we can determine the FSIsolation required
drv, err := r.createDriver()
// Create a temporary driver so that we can determine the FSIsolation
// required. run->startTask will create a new driver after environment
// has been setup (env vars, templates, artifacts, secrets, etc).
tmpDrv, err := r.createDriver()
if err != nil {
e := fmt.Errorf("failed to create driver of task %q for alloc %q: %v", r.task.Name, r.alloc.ID, err)
r.setState(
@ -561,7 +526,7 @@ func (r *TaskRunner) Run() {
// Build base task directory structure regardless of FS isolation abilities.
// This needs to happen before we start the Vault manager and call prestart
// as both those can write to the task directories
if err := r.buildTaskDir(drv.FSIsolation()); err != nil {
if err := r.buildTaskDir(tmpDrv.FSIsolation()); err != nil {
e := fmt.Errorf("failed to build task directory for %q: %v", r.task.Name, err)
r.setState(
structs.TaskStateDead,
@ -613,8 +578,9 @@ func (r *TaskRunner) validateTask() error {
}
// Validate the Service names
taskEnv := r.envBuilder.Build()
for i, service := range r.task.Services {
name := r.taskEnv.ReplaceEnv(service.Name)
name := taskEnv.ReplaceEnv(service.Name)
if err := service.ValidateName(name); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("service (%d) failed validation: %v", i, err))
}
@ -851,12 +817,7 @@ func (r *TaskRunner) writeToken(token string) error {
func (r *TaskRunner) updatedTokenHandler() {
// Update the tasks environment
if err := r.setTaskEnv(); err != nil {
r.setState(
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask())
return
}
r.envBuilder.SetVaultToken(r.vaultFuture.Get(), r.task.Vault.Env)
if r.templateManager != nil {
r.templateManager.Stop()
@ -864,7 +825,7 @@ func (r *TaskRunner) updatedTokenHandler() {
// Create a new templateManager
var err error
r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates,
r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.getTaskEnv())
r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.envBuilder)
if err != nil {
err := fmt.Errorf("failed to build task's template manager: %v", err)
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask())
@ -888,14 +849,7 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
return
}
r.logger.Printf("[DEBUG] client: retrieved Vault token for task %v in alloc %q", r.task.Name, r.alloc.ID)
}
if err := r.setTaskEnv(); err != nil {
r.setState(
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask())
resultCh <- false
return
r.envBuilder.SetVaultToken(r.vaultFuture.Get(), r.task.Vault.Env)
}
// If the job is a dispatch job and there is a payload write it to disk
@ -939,8 +893,9 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
// Download the task's artifacts
if !downloaded && len(r.task.Artifacts) > 0 {
r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts))
taskEnv := r.envBuilder.Build()
for _, artifact := range r.task.Artifacts {
if err := getter.GetArtifact(r.getTaskEnv(), artifact, r.taskDir.Dir); err != nil {
if err := getter.GetArtifact(taskEnv, artifact, r.taskDir.Dir); err != nil {
wrapped := fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err)
r.logger.Printf("[DEBUG] client: %v", wrapped)
r.setState(structs.TaskStatePending,
@ -971,7 +926,7 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
if r.templateManager == nil {
var err error
r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates,
r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.getTaskEnv())
r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.envBuilder)
if err != nil {
err := fmt.Errorf("failed to build task's template manager: %v", err)
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask())
@ -1219,7 +1174,7 @@ func (r *TaskRunner) cleanup() {
res := r.getCreatedResources()
ctx := driver.NewExecContext(r.taskDir)
ctx := driver.NewExecContext(r.taskDir, r.envBuilder.Build())
attempts := 1
var cleanupErr error
for retry := true; retry; attempts++ {
@ -1347,13 +1302,20 @@ func (r *TaskRunner) startTask() error {
}
// Run prestart
ctx := driver.NewExecContext(r.taskDir)
res, err := drv.Prestart(ctx, r.task)
ctx := driver.NewExecContext(r.taskDir, r.envBuilder.Build())
resp, err := drv.Prestart(ctx, r.task)
// Merge newly created resources into previously created resources
r.createdResourcesLock.Lock()
r.createdResources.Merge(res)
r.createdResourcesLock.Unlock()
if resp != nil {
r.createdResourcesLock.Lock()
r.createdResources.Merge(resp.CreatedResources)
r.createdResourcesLock.Unlock()
// Update environment with PortMap if it was returned
if len(resp.PortMap) > 0 {
r.envBuilder.SetPortMap(resp.PortMap)
}
}
if err != nil {
wrapped := fmt.Sprintf("failed to initialize task %q for alloc %q: %v",
@ -1362,6 +1324,9 @@ func (r *TaskRunner) startTask() error {
return structs.WrapRecoverable(wrapped, err)
}
// Create a new context for Start since the environment may have been updated.
ctx = driver.NewExecContext(r.taskDir, r.envBuilder.Build())
// Start the job
handle, err := drv.Start(ctx, r.task)
if err != nil {
@ -1399,13 +1364,13 @@ func (r *TaskRunner) registerServices(d driver.Driver, h driver.ScriptExecutor)
// Allow set the script executor if the driver supports it
exec = h
}
interpolateServices(r.getTaskEnv(), r.task)
interpolateServices(r.envBuilder.Build(), r.task)
return r.consul.RegisterTask(r.alloc.ID, r.task, exec)
}
// interpolateServices interpolates tags in a service and checks with values from the
// task's environment.
func interpolateServices(taskEnv *env.TaskEnvironment, task *structs.Task) {
func interpolateServices(taskEnv *env.TaskEnv, task *structs.Task) {
for _, service := range task.Services {
for _, check := range service.Checks {
check.Name = taskEnv.ReplaceEnv(check.Name)
@ -1450,6 +1415,9 @@ func (r *TaskRunner) buildTaskDir(fsi cstructs.FSIsolation) error {
r.persistLock.Lock()
r.taskDirBuilt = true
r.persistLock.Unlock()
// Set path and host related env vars
driver.SetEnvvars(r.envBuilder, fsi, r.taskDir, r.config)
return nil
}
@ -1535,6 +1503,9 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
// Merge in the task resources
updatedTask.Resources = update.TaskResources[updatedTask.Name]
// Update the task's environment
r.envBuilder.UpdateTask(update, updatedTask)
var mErr multierror.Error
r.handleLock.Lock()
if r.handle != nil {
@ -1550,7 +1521,8 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
mErr.Errors = append(mErr.Errors, fmt.Errorf("updating task resources failed: %v", err))
}
if err := r.updateServices(drv, r.handle, r.task, updatedTask, update); err != nil {
// Update services in Consul
if err := r.updateServices(drv, r.handle, r.task, updatedTask); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("error updating services and checks in Consul: %v", err))
}
}
@ -1568,17 +1540,13 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
}
// updateServices and checks with Consul.
func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, old, new *structs.Task, newAlloc *structs.Allocation) error {
func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, old, new *structs.Task) error {
var exec driver.ScriptExecutor
if d.Abilities().Exec {
// Allow set the script executor if the driver supports it
exec = h
}
newTaskEnv, err := driver.GetTaskEnv(r.taskDir, r.config.Node, new, newAlloc, r.config, r.vaultFuture.Get())
if err != nil {
return err
}
interpolateServices(newTaskEnv, new)
interpolateServices(r.envBuilder.Build(), new)
return r.consul.UpdateTask(r.alloc.ID, old, new, exec)
}

View file

@ -610,10 +610,6 @@ func TestTaskRunner_Validate_UserEnforcement(t *testing.T) {
ctx := testTaskRunner(t, false)
defer ctx.Cleanup()
if err := ctx.tr.setTaskEnv(); err != nil {
t.Fatalf("bad: %v", err)
}
// Try to run as root with exec.
ctx.tr.task.Driver = "exec"
ctx.tr.task.User = "root"

View file

@ -680,6 +680,7 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
Perms: *template.Perms,
LeftDelim: *template.LeftDelim,
RightDelim: *template.RightDelim,
Envvars: *template.Envvars,
}
}
}

View file

@ -9,7 +9,6 @@ import (
"os/exec"
"time"
"github.com/hashicorp/nomad/client/driver/env"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/kardianos/osext"
)
@ -23,12 +22,6 @@ func Path() string {
return path
}
// SetEnv configures the environment of the task so that Run executes a testtask
// script when called from within cmd.
func SetEnv(env *env.TaskEnvironment) {
env.AppendEnvvars(map[string]string{"TEST_TASK": "execute"})
}
// SetCmdEnv configures the environment of cmd so that Run executes a testtask
// script when called from within cmd.
func SetCmdEnv(cmd *exec.Cmd) {

View file

@ -861,6 +861,7 @@ func parseTemplates(result *[]*api.Template, list *ast.ObjectList) error {
"right_delimiter",
"source",
"splay",
"env",
}
if err := checkHCLKeys(o.Val, valid); err != nil {
return err

View file

@ -181,6 +181,7 @@ func TestParse(t *testing.T) {
ChangeSignal: helper.StringToPtr("foo"),
Splay: helper.TimeToPtr(10 * time.Second),
Perms: helper.StringToPtr("0644"),
Envvars: helper.BoolToPtr(true),
},
{
SourcePath: helper.StringToPtr("bar"),

View file

@ -156,6 +156,7 @@ job "binstore-storagelocker" {
change_mode = "foo"
change_signal = "foo"
splay = "10s"
env = true
}
template {

View file

@ -3781,6 +3781,7 @@ func TestTaskDiff(t *testing.T) {
ChangeSignal: "SIGHUP2",
Splay: 2,
Perms: "0666",
Envvars: true,
},
},
},
@ -3837,6 +3838,12 @@ func TestTaskDiff(t *testing.T) {
Old: "",
New: "baz3",
},
{
Type: DiffTypeAdded,
Name: "Envvars",
Old: "",
New: "false",
},
{
Type: DiffTypeAdded,
Name: "Perms",
@ -3885,6 +3892,12 @@ func TestTaskDiff(t *testing.T) {
Old: "baz2",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Envvars",
Old: "true",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Perms",

View file

@ -2943,6 +2943,18 @@ type Template struct {
// delimiter is utilized when parsing the template.
LeftDelim string
RightDelim string
// Envvars enables exposing the template as environment variables
// instead of as a file. The template must be of the form:
//
// VAR_NAME_1={{ key service/my-key }}
// VAR_NAME_2=raw string and {{ env "attr.kernel.name" }}
//
// Lines will be split on the initial "=" with the first part being the
// key name and the second part the value.
// Empty lines and lines starting with # will be ignored, but to avoid
// escaping issues #s within lines will not be treated as comments.
Envvars bool
}
// DefaultTemplate returns a default template.

View file

@ -44,7 +44,7 @@ Nomad utilizes a tool called [Consul Template][ct]. Since Nomad v0.5.3, the
template can reference [Nomad's runtime environment variables][env]. Since Nomad
v0.5.6, the template can reference [Node attributes and metadata][nodevars]. For
a full list of the API template functions, please refer to the [Consul Template
README][ct].
README][ct]. Since Nomad v0.6.0, templates can be read as environment variables.
## `template` Parameters
@ -68,14 +68,17 @@ README][ct].
- `destination` `(string: <required>)` - Specifies the location where the
resulting template should be rendered, relative to the task directory.
* `left_delimiter` `(string: "{{")` - Specifies the left delimiter to use in the
- `env` `(bool: false)` - Specifies the template should be read back in as
environment variables for the task. (See below)
- `left_delimiter` `(string: "{{")` - Specifies the left delimiter to use in the
template. The default is "{{" for some templates, it may be easier to use a
different delimiter that does not conflict with the output file itself.
- `perms` `(string: "644")` - Specifies the rendered template's permissions.
File permissions are given as octal of the unix file permissions rwxrwxrwx.
* `right_delimiter` `(string: "}}")` - Specifies the right delimiter to use in the
- `right_delimiter` `(string: "}}")` - Specifies the right delimiter to use in the
template. The default is "}}" for some templates, it may be easier to use a
different delimiter that does not conflict with the output file itself.
@ -157,6 +160,46 @@ template {
}
```
### Environment Variables
Since v0.6.0 templates may be used to create environment variables for tasks.
Env templates work exactly like other templates except once they're written,
they're read back in as `KEY=value` pairs. Those key value pairs are included
in the task's environment.
For example the following template stanza:
```hcl
template {
data = <<EOH
# Lines starting with a # are ignored
# Empty lines are also ignored
CORES={{ env "attr.cpu.numcores" }}
SERVICE_KEY={{ key "service/my-key" }}
EOH
destination = "local/file.env"
env = true
}
```
The task's environment would then have environment variables like the
following:
```
CORES=4
SERVICE_KEY=12345678-1234-1234-1234-1234-123456789abc
```
This allows [12factor app](https://12factor.net/config) style environment
variable based configuration while keeping all of the familiar features and
semantics of Nomad templates.
The parser reads each line, discards empty lines or lines starting
with a `#`, and then splits on the first `=`. The first part of the split is
the key name, the second part is the key's value.
## Client Configuration
The `template` block has the following [client configuration