566 lines
14 KiB
Go
566 lines
14 KiB
Go
package client
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"math/rand"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
ctconf "github.com/hashicorp/consul-template/config"
|
|
"github.com/hashicorp/consul-template/manager"
|
|
"github.com/hashicorp/consul-template/signals"
|
|
multierror "github.com/hashicorp/go-multierror"
|
|
"github.com/hashicorp/nomad/client/config"
|
|
"github.com/hashicorp/nomad/client/driver/env"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
const (
|
|
// hostSrcOption is the Client option that determines whether the template
|
|
// source may be from the host
|
|
hostSrcOption = "template.allow_host_source"
|
|
)
|
|
|
|
var (
|
|
// testRetryRate is used to speed up tests by setting consul-templates retry
|
|
// rate to something low
|
|
testRetryRate time.Duration = 0
|
|
)
|
|
|
|
// TaskHooks is an interface which provides hooks into the tasks life-cycle
|
|
type TaskHooks interface {
|
|
// Restart is used to restart the task
|
|
Restart(source, reason string)
|
|
|
|
// Signal is used to signal the task
|
|
Signal(source, reason string, s os.Signal) error
|
|
|
|
// UnblockStart is used to unblock the starting of the task. This should be
|
|
// called after prestart work is completed
|
|
UnblockStart(source string)
|
|
|
|
// Kill is used to kill the task because of the passed error. If fail is set
|
|
// to true, the task is marked as failed
|
|
Kill(source, reason string, fail bool)
|
|
}
|
|
|
|
// TaskTemplateManager is used to run a set of templates for a given task
|
|
type TaskTemplateManager struct {
|
|
// templates is the set of templates we are managing
|
|
templates []*structs.Template
|
|
|
|
// lookup allows looking up the set of Nomad templates by their consul-template ID
|
|
lookup map[string][]*structs.Template
|
|
|
|
// hooks is used to signal/restart the task as templates are rendered
|
|
hook TaskHooks
|
|
|
|
// runner is the consul-template runner
|
|
runner *manager.Runner
|
|
|
|
// signals is a lookup map from the string representation of a signal to its
|
|
// actual signal
|
|
signals map[string]os.Signal
|
|
|
|
// shutdownCh is used to signal and started goroutine to shutdown
|
|
shutdownCh chan struct{}
|
|
|
|
// shutdown marks whether the manager has been shutdown
|
|
shutdown bool
|
|
shutdownLock sync.Mutex
|
|
}
|
|
|
|
func NewTaskTemplateManager(hook TaskHooks, tmpls []*structs.Template,
|
|
config *config.Config, vaultToken, taskDir string,
|
|
envBuilder *env.Builder) (*TaskTemplateManager, error) {
|
|
|
|
// Check pre-conditions
|
|
if hook == nil {
|
|
return nil, fmt.Errorf("Invalid task hook given")
|
|
} else if config == nil {
|
|
return nil, fmt.Errorf("Invalid config given")
|
|
} else if taskDir == "" {
|
|
return nil, fmt.Errorf("Invalid task directory given")
|
|
} else if envBuilder == nil {
|
|
return nil, fmt.Errorf("Invalid task environment given")
|
|
}
|
|
|
|
tm := &TaskTemplateManager{
|
|
templates: tmpls,
|
|
hook: hook,
|
|
shutdownCh: make(chan struct{}),
|
|
}
|
|
|
|
// Parse the signals that we need
|
|
for _, tmpl := range tmpls {
|
|
if tmpl.ChangeSignal == "" {
|
|
continue
|
|
}
|
|
|
|
sig, err := signals.Parse(tmpl.ChangeSignal)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to parse signal %q", tmpl.ChangeSignal)
|
|
}
|
|
|
|
if tm.signals == nil {
|
|
tm.signals = make(map[string]os.Signal)
|
|
}
|
|
|
|
tm.signals[tmpl.ChangeSignal] = sig
|
|
}
|
|
|
|
// Build the consul-template runner
|
|
runner, lookup, err := templateRunner(tmpls, config, vaultToken, taskDir, envBuilder.Build())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tm.runner = runner
|
|
tm.lookup = lookup
|
|
|
|
go tm.run(envBuilder, taskDir)
|
|
return tm, nil
|
|
}
|
|
|
|
// Stop is used to stop the consul-template runner
|
|
func (tm *TaskTemplateManager) Stop() {
|
|
tm.shutdownLock.Lock()
|
|
defer tm.shutdownLock.Unlock()
|
|
|
|
if tm.shutdown {
|
|
return
|
|
}
|
|
|
|
close(tm.shutdownCh)
|
|
tm.shutdown = true
|
|
|
|
// Stop the consul-template runner
|
|
if tm.runner != nil {
|
|
tm.runner.Stop()
|
|
}
|
|
}
|
|
|
|
// run is the long lived loop that handles errors and templates being rendered
|
|
func (tm *TaskTemplateManager) run(envBuilder *env.Builder, taskDir string) {
|
|
// Runner is nil if there is no templates
|
|
if tm.runner == nil {
|
|
// Unblock the start if there is nothing to do
|
|
tm.hook.UnblockStart("consul-template")
|
|
return
|
|
}
|
|
|
|
// Start the runner
|
|
go tm.runner.Start()
|
|
|
|
// Track when they have all been rendered so we don't signal the task for
|
|
// any render event before hand
|
|
var allRenderedTime time.Time
|
|
|
|
// Handle the first rendering
|
|
// Wait till all the templates have been rendered
|
|
WAIT:
|
|
for {
|
|
select {
|
|
case <-tm.shutdownCh:
|
|
return
|
|
case err, ok := <-tm.runner.ErrCh:
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
tm.hook.Kill("consul-template", err.Error(), true)
|
|
case <-tm.runner.TemplateRenderedCh():
|
|
// A template has been rendered, figure out what to do
|
|
events := tm.runner.RenderEvents()
|
|
|
|
// Not all templates have been rendered yet
|
|
if len(events) < len(tm.lookup) {
|
|
continue
|
|
}
|
|
|
|
for _, event := range events {
|
|
// This template hasn't been rendered
|
|
if event.LastWouldRender.IsZero() {
|
|
continue WAIT
|
|
}
|
|
}
|
|
|
|
break WAIT
|
|
}
|
|
}
|
|
|
|
// Read environment variables from env templates
|
|
for _, t := range tm.templates {
|
|
if err := loadTemplateEnv(envBuilder, taskDir, t); err != nil {
|
|
tm.hook.Kill("consul-template", err.Error(), true)
|
|
return
|
|
}
|
|
}
|
|
|
|
allRenderedTime = time.Now()
|
|
tm.hook.UnblockStart("consul-template")
|
|
|
|
// If all our templates are change mode no-op, then we can exit here
|
|
if tm.allTemplatesNoop() {
|
|
return
|
|
}
|
|
|
|
// A lookup for the last time the template was handled
|
|
numTemplates := len(tm.templates)
|
|
handledRenders := make(map[string]time.Time, numTemplates)
|
|
|
|
for {
|
|
select {
|
|
case <-tm.shutdownCh:
|
|
return
|
|
case err, ok := <-tm.runner.ErrCh:
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
tm.hook.Kill("consul-template", err.Error(), true)
|
|
case <-tm.runner.TemplateRenderedCh():
|
|
// A template has been rendered, figure out what to do
|
|
var handling []string
|
|
signals := make(map[string]struct{})
|
|
restart := false
|
|
var splay time.Duration
|
|
|
|
events := tm.runner.RenderEvents()
|
|
for id, event := range events {
|
|
|
|
// First time through
|
|
if allRenderedTime.After(event.LastDidRender) || allRenderedTime.Equal(event.LastDidRender) {
|
|
handledRenders[id] = allRenderedTime
|
|
continue
|
|
}
|
|
|
|
// We have already handled this one
|
|
if htime := handledRenders[id]; htime.After(event.LastDidRender) || htime.Equal(event.LastDidRender) {
|
|
continue
|
|
}
|
|
|
|
// Lookup the template and determine what to do
|
|
tmpls, ok := tm.lookup[id]
|
|
if !ok {
|
|
tm.hook.Kill("consul-template", fmt.Sprintf("consul-template runner returned unknown template id %q", id), true)
|
|
return
|
|
}
|
|
|
|
for _, tmpl := range tmpls {
|
|
if err := loadTemplateEnv(envBuilder, taskDir, tmpl); err != nil {
|
|
|
|
tm.hook.Kill("consul-template", err.Error(), true)
|
|
return
|
|
}
|
|
switch tmpl.ChangeMode {
|
|
case structs.TemplateChangeModeSignal:
|
|
signals[tmpl.ChangeSignal] = struct{}{}
|
|
case structs.TemplateChangeModeRestart:
|
|
restart = true
|
|
case structs.TemplateChangeModeNoop:
|
|
continue
|
|
}
|
|
|
|
if tmpl.Splay > splay {
|
|
splay = tmpl.Splay
|
|
}
|
|
}
|
|
|
|
handling = append(handling, id)
|
|
}
|
|
|
|
if restart || len(signals) != 0 {
|
|
if splay != 0 {
|
|
ns := splay.Nanoseconds()
|
|
offset := rand.Int63n(ns)
|
|
t := time.Duration(offset)
|
|
|
|
select {
|
|
case <-time.After(t):
|
|
case <-tm.shutdownCh:
|
|
return
|
|
}
|
|
}
|
|
|
|
// Update handle time
|
|
for _, id := range handling {
|
|
handledRenders[id] = events[id].LastDidRender
|
|
}
|
|
|
|
if restart {
|
|
tm.hook.Restart("consul-template", "template with change_mode restart re-rendered")
|
|
} else if len(signals) != 0 {
|
|
var mErr multierror.Error
|
|
for signal := range signals {
|
|
err := tm.hook.Signal("consul-template", "template re-rendered", tm.signals[signal])
|
|
if err != nil {
|
|
multierror.Append(&mErr, err)
|
|
}
|
|
}
|
|
|
|
if err := mErr.ErrorOrNil(); err != nil {
|
|
flat := make([]os.Signal, 0, len(signals))
|
|
for signal := range signals {
|
|
flat = append(flat, tm.signals[signal])
|
|
}
|
|
tm.hook.Kill("consul-template", fmt.Sprintf("Sending signals %v failed: %v", flat, err), true)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// allTemplatesNoop returns whether all the managed templates have change mode noop.
|
|
func (tm *TaskTemplateManager) allTemplatesNoop() bool {
|
|
for _, tmpl := range tm.templates {
|
|
if tmpl.ChangeMode != structs.TemplateChangeModeNoop {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// templateRunner returns a consul-template runner for the given templates and a
|
|
// lookup by destination to the template. If no templates are given, a nil
|
|
// template runner and lookup is returned.
|
|
func templateRunner(tmpls []*structs.Template, config *config.Config,
|
|
vaultToken, taskDir string, taskEnv *env.TaskEnv) (
|
|
*manager.Runner, map[string][]*structs.Template, error) {
|
|
|
|
if len(tmpls) == 0 {
|
|
return nil, nil, nil
|
|
}
|
|
|
|
runnerConfig, err := runnerConfig(config, vaultToken)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Parse the templates
|
|
allowAbs := config.ReadBoolDefault(hostSrcOption, true)
|
|
ctmplMapping, err := parseTemplateConfigs(tmpls, taskDir, taskEnv, allowAbs)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Set the config
|
|
flat := ctconf.TemplateConfigs(make([]*ctconf.TemplateConfig, 0, len(ctmplMapping)))
|
|
for ctmpl := range ctmplMapping {
|
|
local := ctmpl
|
|
flat = append(flat, &local)
|
|
}
|
|
runnerConfig.Templates = &flat
|
|
|
|
runner, err := manager.NewRunner(runnerConfig, false, false)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Set Nomad's environment variables
|
|
runner.Env = taskEnv.All()
|
|
|
|
// Build the lookup
|
|
idMap := runner.TemplateConfigMapping()
|
|
lookup := make(map[string][]*structs.Template, len(idMap))
|
|
for id, ctmpls := range idMap {
|
|
for _, ctmpl := range ctmpls {
|
|
templates := lookup[id]
|
|
templates = append(templates, ctmplMapping[ctmpl])
|
|
lookup[id] = templates
|
|
}
|
|
}
|
|
|
|
return runner, lookup, nil
|
|
}
|
|
|
|
// parseTemplateConfigs converts the tasks templates into consul-templates
|
|
func parseTemplateConfigs(tmpls []*structs.Template, taskDir string,
|
|
taskEnv *env.TaskEnv, allowAbs bool) (map[ctconf.TemplateConfig]*structs.Template, error) {
|
|
|
|
ctmpls := make(map[ctconf.TemplateConfig]*structs.Template, len(tmpls))
|
|
for _, tmpl := range tmpls {
|
|
var src, dest string
|
|
if tmpl.SourcePath != "" {
|
|
if filepath.IsAbs(tmpl.SourcePath) {
|
|
if !allowAbs {
|
|
return nil, fmt.Errorf("Specifying absolute template paths disallowed by client config: %q", tmpl.SourcePath)
|
|
}
|
|
|
|
src = tmpl.SourcePath
|
|
} else {
|
|
src = filepath.Join(taskDir, taskEnv.ReplaceEnv(tmpl.SourcePath))
|
|
}
|
|
}
|
|
if tmpl.DestPath != "" {
|
|
dest = filepath.Join(taskDir, taskEnv.ReplaceEnv(tmpl.DestPath))
|
|
}
|
|
|
|
ct := ctconf.DefaultTemplateConfig()
|
|
ct.Source = &src
|
|
ct.Destination = &dest
|
|
ct.Contents = &tmpl.EmbeddedTmpl
|
|
ct.LeftDelim = &tmpl.LeftDelim
|
|
ct.RightDelim = &tmpl.RightDelim
|
|
|
|
// Set the permissions
|
|
if tmpl.Perms != "" {
|
|
v, err := strconv.ParseUint(tmpl.Perms, 8, 12)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to parse %q as octal: %v", tmpl.Perms, err)
|
|
}
|
|
m := os.FileMode(v)
|
|
ct.Perms = &m
|
|
}
|
|
ct.Finalize()
|
|
|
|
ctmpls[*ct] = tmpl
|
|
}
|
|
|
|
return ctmpls, nil
|
|
}
|
|
|
|
// runnerConfig returns a consul-template runner configuration, setting the
|
|
// Vault and Consul configurations based on the clients configs.
|
|
func runnerConfig(config *config.Config, vaultToken string) (*ctconf.Config, error) {
|
|
conf := ctconf.DefaultConfig()
|
|
|
|
t, f := true, false
|
|
|
|
// Force faster retries
|
|
if testRetryRate != 0 {
|
|
rate := testRetryRate
|
|
conf.Consul.Retry.Backoff = &rate
|
|
}
|
|
|
|
// Setup the Consul config
|
|
if config.ConsulConfig != nil {
|
|
conf.Consul.Address = &config.ConsulConfig.Addr
|
|
conf.Consul.Token = &config.ConsulConfig.Token
|
|
|
|
if config.ConsulConfig.EnableSSL != nil && *config.ConsulConfig.EnableSSL {
|
|
verify := config.ConsulConfig.VerifySSL != nil && *config.ConsulConfig.VerifySSL
|
|
conf.Consul.SSL = &ctconf.SSLConfig{
|
|
Enabled: &t,
|
|
Verify: &verify,
|
|
Cert: &config.ConsulConfig.CertFile,
|
|
Key: &config.ConsulConfig.KeyFile,
|
|
CaCert: &config.ConsulConfig.CAFile,
|
|
}
|
|
}
|
|
|
|
if config.ConsulConfig.Auth != "" {
|
|
parts := strings.SplitN(config.ConsulConfig.Auth, ":", 2)
|
|
if len(parts) != 2 {
|
|
return nil, fmt.Errorf("Failed to parse Consul Auth config")
|
|
}
|
|
|
|
conf.Consul.Auth = &ctconf.AuthConfig{
|
|
Enabled: &t,
|
|
Username: &parts[0],
|
|
Password: &parts[1],
|
|
}
|
|
}
|
|
}
|
|
|
|
// Setup the Vault config
|
|
// Always set these to ensure nothing is picked up from the environment
|
|
emptyStr := ""
|
|
conf.Vault.RenewToken = &f
|
|
conf.Vault.Token = &emptyStr
|
|
if config.VaultConfig != nil && config.VaultConfig.IsEnabled() {
|
|
conf.Vault.Address = &config.VaultConfig.Addr
|
|
conf.Vault.Token = &vaultToken
|
|
|
|
if strings.HasPrefix(config.VaultConfig.Addr, "https") || config.VaultConfig.TLSCertFile != "" {
|
|
skipVerify := config.VaultConfig.TLSSkipVerify != nil && *config.VaultConfig.TLSSkipVerify
|
|
verify := !skipVerify
|
|
conf.Vault.SSL = &ctconf.SSLConfig{
|
|
Enabled: &t,
|
|
Verify: &verify,
|
|
Cert: &config.VaultConfig.TLSCertFile,
|
|
Key: &config.VaultConfig.TLSKeyFile,
|
|
CaCert: &config.VaultConfig.TLSCaFile,
|
|
CaPath: &config.VaultConfig.TLSCaPath,
|
|
}
|
|
} else {
|
|
conf.Vault.SSL = &ctconf.SSLConfig{
|
|
Enabled: &f,
|
|
Verify: &f,
|
|
Cert: &emptyStr,
|
|
Key: &emptyStr,
|
|
CaCert: &emptyStr,
|
|
CaPath: &emptyStr,
|
|
}
|
|
}
|
|
}
|
|
|
|
conf.Finalize()
|
|
return conf, nil
|
|
}
|
|
|
|
// loadTemplateEnv loads task environment variables from templates.
|
|
func loadTemplateEnv(builder *env.Builder, taskDir string, t *structs.Template) error {
|
|
if !t.Envvars {
|
|
return nil
|
|
}
|
|
f, err := os.Open(filepath.Join(taskDir, t.DestPath))
|
|
if err != nil {
|
|
return fmt.Errorf("error opening env template: %v", err)
|
|
}
|
|
defer f.Close()
|
|
|
|
// Parse environment fil
|
|
vars, err := parseEnvFile(f)
|
|
if err != nil {
|
|
return fmt.Errorf("error parsing env template %q: %v", t.DestPath, err)
|
|
}
|
|
|
|
// Set the environment variables
|
|
builder.SetTemplateEnv(vars)
|
|
return nil
|
|
}
|
|
|
|
// parseEnvFile and return a map of the environment variables suitable for
|
|
// TaskEnvironment.AppendEnvvars or an error.
|
|
//
|
|
// See nomad/structs#Template.Envvars comment for format.
|
|
func parseEnvFile(r io.Reader) (map[string]string, error) {
|
|
vars := make(map[string]string, 50)
|
|
lines := 0
|
|
scanner := bufio.NewScanner(r)
|
|
for scanner.Scan() {
|
|
lines++
|
|
buf := scanner.Bytes()
|
|
if len(buf) == 0 {
|
|
// Skip empty lines
|
|
continue
|
|
}
|
|
if buf[0] == '#' {
|
|
// Skip lines starting with a #
|
|
continue
|
|
}
|
|
n := bytes.IndexByte(buf, '=')
|
|
if n == -1 {
|
|
return nil, fmt.Errorf("line %d: no '=' sign: %q", lines, string(buf))
|
|
}
|
|
if len(buf) > n {
|
|
vars[string(buf[0:n])] = string(buf[n+1 : len(buf)])
|
|
} else {
|
|
vars[string(buf[0:n])] = ""
|
|
}
|
|
}
|
|
if err := scanner.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return vars, nil
|
|
}
|