package client import ( "fmt" "os" "path/filepath" "strings" "sync" "time" ctconf "github.com/hashicorp/consul-template/config" "github.com/hashicorp/consul-template/manager" "github.com/hashicorp/consul-template/signals" "github.com/hashicorp/consul-template/watch" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/env" "github.com/hashicorp/nomad/nomad/structs" ) var ( // testRetryRate is used to speed up tests by setting consul-templates retry // rate to something low testRetryRate time.Duration = 0 ) // TaskHooks is an interface which provides hooks into the tasks life-cycle type TaskHooks interface { // Restart is used to restart the task Restart(source, reason string) // Signal is used to signal the task Signal(source, reason string, s os.Signal) error // UnblockStart is used to unblock the starting of the task. This should be // called after prestart work is completed UnblockStart(source string) // Kill is used to kill the task because of the passed error. If fail is set // to true, the task is marked as failed Kill(source, reason string, fail bool) } // TaskTemplateManager is used to run a set of templates for a given task type TaskTemplateManager struct { // templates is the set of templates we are managing templates []*structs.Template // lookup allows looking up the set of Nomad templates by their consul-template ID lookup map[string][]*structs.Template // allRendered marks whether all the templates have been rendered allRendered bool // hooks is used to signal/restart the task as templates are rendered hook TaskHooks // runner is the consul-template runner runner *manager.Runner // signals is a lookup map from the string representation of a signal to its // actual signal signals map[string]os.Signal // shutdownCh is used to signal and started goroutine to shutdown shutdownCh chan struct{} // shutdown marks whether the manager has been shutdown shutdown bool shutdownLock sync.Mutex } func NewTaskTemplateManager(hook TaskHooks, tmpls []*structs.Template, allRendered bool, config *config.Config, vaultToken, taskDir string, taskEnv *env.TaskEnvironment) (*TaskTemplateManager, error) { // Check pre-conditions if hook == nil { return nil, fmt.Errorf("Invalid task hook given") } else if config == nil { return nil, fmt.Errorf("Invalid config given") } else if taskDir == "" { return nil, fmt.Errorf("Invalid task directory given") } else if taskEnv == nil { return nil, fmt.Errorf("Invalid task environment given") } tm := &TaskTemplateManager{ templates: tmpls, allRendered: allRendered, hook: hook, shutdownCh: make(chan struct{}), } // Parse the signals that we need for _, tmpl := range tmpls { if tmpl.ChangeSignal == "" { continue } sig, err := signals.Parse(tmpl.ChangeSignal) if err != nil { return nil, fmt.Errorf("Failed to parse signal %q", tmpl.ChangeSignal) } if tm.signals == nil { tm.signals = make(map[string]os.Signal) } tm.signals[tmpl.ChangeSignal] = sig } // Build the consul-template runner runner, lookup, err := templateRunner(tmpls, config, vaultToken, taskDir, taskEnv) if err != nil { return nil, err } tm.runner = runner tm.lookup = lookup go tm.run() return tm, nil } // Stop is used to stop the consul-template runner func (tm *TaskTemplateManager) Stop() { tm.shutdownLock.Lock() defer tm.shutdownLock.Unlock() if tm.shutdown { return } close(tm.shutdownCh) tm.shutdown = true // Stop the consul-template runner if tm.runner != nil { tm.runner.Stop() } } // run is the long lived loop that handles errors and templates being rendered func (tm *TaskTemplateManager) run() { // Runner is nil if there is no templates if tm.runner == nil { // Unblock the start if there is nothing to do if !tm.allRendered { tm.hook.UnblockStart("consul-template") } return } // Start the runner go tm.runner.Start() // Track when they have all been rendered so we don't signal the task for // any render event before hand var allRenderedTime time.Time // Handle the first rendering if !tm.allRendered { // Wait till all the templates have been rendered WAIT: for { select { case <-tm.shutdownCh: return case err, ok := <-tm.runner.ErrCh: if !ok { continue } tm.hook.Kill("consul-template", err.Error(), true) case <-tm.runner.TemplateRenderedCh(): // A template has been rendered, figure out what to do events := tm.runner.RenderEvents() // Not all templates have been rendered yet if len(events) < len(tm.lookup) { continue } for _, event := range events { // This template hasn't been rendered if event.LastDidRender.IsZero() { continue WAIT } } break WAIT } // TODO Thinking, I believe we could check every 30 seconds and if // they are all would be rendered we should start anyways. That is // the reattach mechanism when they have all been rendered } allRenderedTime = time.Now() tm.hook.UnblockStart("consul-template") } // If all our templates are change mode no-op, then we can exit here if tm.allTemplatesNoop() { return } // A lookup for the last time the template was handled numTemplates := len(tm.templates) handledRenders := make(map[string]time.Time, numTemplates) for { select { case <-tm.shutdownCh: return case err, ok := <-tm.runner.ErrCh: if !ok { continue } tm.hook.Kill("consul-template", err.Error(), true) case <-tm.runner.TemplateRenderedCh(): // A template has been rendered, figure out what to do var handling []string signals := make(map[string]struct{}) restart := false var splay time.Duration now := time.Now() for id, event := range tm.runner.RenderEvents() { // First time through if allRenderedTime.After(event.LastDidRender) { handledRenders[id] = now continue } // We have already handled this one if htime := handledRenders[id]; htime.After(event.LastDidRender) { continue } // Lookup the template and determine what to do tmpls, ok := tm.lookup[id] if !ok { tm.hook.Kill("consul-template", fmt.Sprintf("consul-template runner returned unknown template id %q", id), true) return } for _, tmpl := range tmpls { switch tmpl.ChangeMode { case structs.TemplateChangeModeSignal: signals[tmpl.ChangeSignal] = struct{}{} case structs.TemplateChangeModeRestart: restart = true case structs.TemplateChangeModeNoop: continue } if tmpl.Splay > splay { splay = tmpl.Splay } } handling = append(handling, id) } if restart || len(signals) != 0 { if splay != 0 { select { case <-time.After(time.Duration(splay)): case <-tm.shutdownCh: return } } // Update handle time now = time.Now() for _, id := range handling { handledRenders[id] = now } if restart { tm.hook.Restart("consul-template", "template with change_mode restart re-rendered") } else if len(signals) != 0 { var mErr multierror.Error for signal := range signals { err := tm.hook.Signal("consul-template", "template re-rendered", tm.signals[signal]) if err != nil { multierror.Append(&mErr, err) } } if err := mErr.ErrorOrNil(); err != nil { flat := make([]os.Signal, 0, len(signals)) for signal := range signals { flat = append(flat, tm.signals[signal]) } tm.hook.Kill("consul-template", fmt.Sprintf("Sending signals %v failed: %v", flat, err), true) } } } } } } // allTemplatesNoop returns whether all the managed templates have change mode noop. func (tm *TaskTemplateManager) allTemplatesNoop() bool { for _, tmpl := range tm.templates { if tmpl.ChangeMode != structs.TemplateChangeModeNoop { return false } } return true } // templateRunner returns a consul-template runner for the given templates and a // lookup by destination to the template. If no templates are given, a nil // template runner and lookup is returned. func templateRunner(tmpls []*structs.Template, config *config.Config, vaultToken, taskDir string, taskEnv *env.TaskEnvironment) ( *manager.Runner, map[string][]*structs.Template, error) { if len(tmpls) == 0 { return nil, nil, nil } runnerConfig, err := runnerConfig(config, vaultToken) if err != nil { return nil, nil, err } // Parse the templates ctmplMapping := parseTemplateConfigs(tmpls, taskDir, taskEnv) // Set the config flat := make([]*ctconf.ConfigTemplate, 0, len(ctmplMapping)) for ctmpl := range ctmplMapping { local := ctmpl flat = append(flat, &local) } runnerConfig.ConfigTemplates = flat runner, err := manager.NewRunner(runnerConfig, false, false) if err != nil { return nil, nil, err } // Build the lookup idMap := runner.ConfigTemplateMapping() lookup := make(map[string][]*structs.Template, len(idMap)) for id, ctmpls := range idMap { for _, ctmpl := range ctmpls { templates := lookup[id] templates = append(templates, ctmplMapping[ctmpl]) lookup[id] = templates } } return runner, lookup, nil } // parseTemplateConfigs converts the tasks templates into consul-templates func parseTemplateConfigs(tmpls []*structs.Template, taskDir string, taskEnv *env.TaskEnvironment) map[ctconf.ConfigTemplate]*structs.Template { // Build the task environment // TODO Should be able to inject the Nomad env vars into Consul-template for // rendering taskEnv.Build() ctmpls := make(map[ctconf.ConfigTemplate]*structs.Template, len(tmpls)) for _, tmpl := range tmpls { var src, dest string if tmpl.SourcePath != "" { src = filepath.Join(taskDir, taskEnv.ReplaceEnv(tmpl.SourcePath)) } if tmpl.DestPath != "" { dest = filepath.Join(taskDir, taskEnv.ReplaceEnv(tmpl.DestPath)) } ct := ctconf.ConfigTemplate{ Source: src, Destination: dest, EmbeddedTemplate: tmpl.EmbeddedTmpl, Perms: ctconf.DefaultFilePerms, Wait: &watch.Wait{}, } ctmpls[ct] = tmpl } return ctmpls } // runnerConfig returns a consul-template runner configuration, setting the // Vault and Consul configurations based on the clients configs. func runnerConfig(config *config.Config, vaultToken string) (*ctconf.Config, error) { conf := &ctconf.Config{} set := func(keys []string) { for _, k := range keys { conf.Set(k) } } if testRetryRate != 0 { conf.Retry = testRetryRate conf.Set("retry") } // Setup the Consul config if config.ConsulConfig != nil { conf.Consul = config.ConsulConfig.Addr conf.Token = config.ConsulConfig.Token set([]string{"consul", "token"}) if config.ConsulConfig.EnableSSL { conf.SSL = &ctconf.SSLConfig{ Enabled: true, Verify: config.ConsulConfig.VerifySSL, Cert: config.ConsulConfig.CertFile, Key: config.ConsulConfig.KeyFile, CaCert: config.ConsulConfig.CAFile, } set([]string{"ssl", "ssl.enabled", "ssl.verify", "ssl.cert", "ssl.key", "ssl.ca_cert"}) } if config.ConsulConfig.Auth != "" { parts := strings.SplitN(config.ConsulConfig.Auth, ":", 2) if len(parts) != 2 { return nil, fmt.Errorf("Failed to parse Consul Auth config") } conf.Auth = &ctconf.AuthConfig{ Enabled: true, Username: parts[0], Password: parts[1], } set([]string{"auth", "auth.username", "auth.password", "auth.enabled"}) } } // Setup the Vault config if config.VaultConfig != nil && config.VaultConfig.IsEnabled() { conf.Vault = &ctconf.VaultConfig{ Address: config.VaultConfig.Addr, Token: vaultToken, RenewToken: false, } set([]string{"vault", "vault.address", "vault.token", "vault.renew_token"}) if strings.HasPrefix(config.VaultConfig.Addr, "https") || config.VaultConfig.TLSCertFile != "" { verify := config.VaultConfig.TLSSkipVerify == nil || !*config.VaultConfig.TLSSkipVerify conf.Vault.SSL = &ctconf.SSLConfig{ Enabled: true, Verify: !verify, Cert: config.VaultConfig.TLSCertFile, Key: config.VaultConfig.TLSKeyFile, CaCert: config.VaultConfig.TLSCaFile, CaPath: config.VaultConfig.TLSCaPath, } set([]string{"vault.ssl", "vault.ssl.enabled", "vault.ssl.verify", "vault.ssl.cert", "vault.ssl.key", "vault.ssl.ca_cert"}) } } return conf, nil }