//+build linux,lxc package lxc import ( "context" "fmt" "strconv" "time" "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/stats" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/drivers/shared/eventer" "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/plugins/shared/hclspec" "github.com/hashicorp/nomad/plugins/shared/loader" pstructs "github.com/hashicorp/nomad/plugins/shared/structs" "gopkg.in/lxc/go-lxc.v2" ) const ( // pluginName is the name of the plugin pluginName = "lxc" // fingerprintPeriod is the interval at which the driver will send fingerprint responses fingerprintPeriod = 30 * time.Second ) var ( // PluginID is the rawexec plugin metadata registered in the plugin // catalog. PluginID = loader.PluginID{ Name: pluginName, PluginType: base.PluginTypeDriver, } // PluginConfig is the rawexec factory function registered in the // plugin catalog. PluginConfig = &loader.InternalPluginConfig{ Config: map[string]interface{}{}, Factory: func(l hclog.Logger) interface{} { return NewLXCDriver(l) }, } ) // PluginLoader maps pre-0.9 client driver options to post-0.9 plugin options. func PluginLoader(opts map[string]string) (map[string]interface{}, error) { conf := map[string]interface{}{} if v, err := strconv.ParseBool(opts["driver.lxc.enable"]); err == nil { conf["enabled"] = v } if v, err := strconv.ParseBool(opts["lxc.volumes.enabled"]); err == nil { conf["volumes"] = v } if v, ok := opts["driver.lxc.path"]; ok { conf["path"] = v } return conf, nil } var ( // pluginInfo is the response returned for the PluginInfo RPC pluginInfo = &base.PluginInfoResponse{ Type: base.PluginTypeDriver, PluginApiVersion: "0.0.1", PluginVersion: "0.1.0", Name: pluginName, } // configSpec is the hcl specification returned by the ConfigSchema RPC configSpec = hclspec.NewObject(map[string]*hclspec.Spec{ "enabled": hclspec.NewDefault( hclspec.NewAttr("enabled", "bool", false), hclspec.NewLiteral("true"), ), "volumes": hclspec.NewDefault( hclspec.NewAttr("volumes", "bool", false), hclspec.NewLiteral("true"), ), "path": hclspec.NewDefault( hclspec.NewAttr("path", "string", false), hclspec.NewLiteral("\"\""), ), }) // taskConfigSpec is the hcl specification for the driver config section of // a task within a job. It is returned in the TaskConfigSchema RPC taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{ "template": hclspec.NewAttr("template", "string", true), "distro": hclspec.NewAttr("distro", "string", false), "release": hclspec.NewAttr("release", "string", false), "arch": hclspec.NewAttr("arch", "string", false), "image_variant": hclspec.NewAttr("image_variant", "string", false), "image_server": hclspec.NewAttr("image_server", "string", false), "gpg_key_id": hclspec.NewAttr("gpg_key_id", "string", false), "gpg_key_server": hclspec.NewAttr("gpg_key_server", "string", false), "disable_gpg": hclspec.NewAttr("disable_gpg", "string", false), "flush_cache": hclspec.NewAttr("flush_cache", "string", false), "force_cache": hclspec.NewAttr("force_cache", "string", false), "template_args": hclspec.NewAttr("template_args", "list(string)", false), "log_level": hclspec.NewAttr("log_level", "string", false), "verbosity": hclspec.NewAttr("verbosity", "string", false), "volumes": hclspec.NewAttr("volumes", "list(string)", false), }) // capabilities is returned by the Capabilities RPC and indicates what // optional features this driver supports capabilities = &drivers.Capabilities{ SendSignals: false, Exec: false, FSIsolation: cstructs.FSIsolationImage, } ) // Driver is a driver for running LXC containers type Driver struct { // eventer is used to handle multiplexing of TaskEvents calls such that an // event can be broadcast to all callers eventer *eventer.Eventer // config is the driver configuration set by the SetConfig RPC config *Config // nomadConfig is the client config from nomad nomadConfig *base.ClientDriverConfig // tasks is the in memory datastore mapping taskIDs to rawExecDriverHandles tasks *taskStore // ctx is the context for the driver. It is passed to other subsystems to // coordinate shutdown ctx context.Context // signalShutdown is called when the driver is shutting down and cancels the // ctx passed to any subsystems signalShutdown context.CancelFunc // logger will log to the Nomad agent logger hclog.Logger } // Config is the driver configuration set by the SetConfig RPC call type Config struct { // Enabled is set to true to enable the lxc driver Enabled bool `codec:"enabled"` AllowVolumes bool `codec:"volumes"` Path string `codec:"path"` } // TaskConfig is the driver configuration of a task within a job type TaskConfig struct { Template string `codec:"template"` Distro string `codec:"distro"` Release string `codec:"release"` Arch string `codec:"arch"` ImageVariant string `codec:"image_variant"` ImageServer string `codec:"image_server"` GPGKeyID string `codec:"gpg_key_id"` GPGKeyServer string `codec:"gpg_key_server"` DisableGPGValidation bool `codec:"disable_gpg"` FlushCache bool `codec:"flush_cache"` ForceCache bool `codec:"force_cache"` TemplateArgs []string `codec:"template_args"` LogLevel string `codec:"log_level"` Verbosity string `codec:"verbosity"` Volumes []string `codec:"volumes"` } // TaskState is the state which is encoded in the handle returned in // StartTask. This information is needed to rebuild the task state and handler // during recovery. type TaskState struct { TaskConfig *drivers.TaskConfig ContainerName string StartedAt time.Time } // NewLXCDriver returns a new DriverPlugin implementation func NewLXCDriver(logger hclog.Logger) drivers.DriverPlugin { ctx, cancel := context.WithCancel(context.Background()) logger = logger.Named(pluginName) return &Driver{ eventer: eventer.NewEventer(ctx, logger), config: &Config{}, tasks: newTaskStore(), ctx: ctx, signalShutdown: cancel, logger: logger, } } func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) { return pluginInfo, nil } func (d *Driver) ConfigSchema() (*hclspec.Spec, error) { return configSpec, nil } func (d *Driver) SetConfig(data []byte, cfg *base.ClientAgentConfig) error { var config Config if err := base.MsgPackDecode(data, &config); err != nil { return err } d.config = &config if cfg != nil { d.nomadConfig = cfg.Driver } return nil } func (d *Driver) Shutdown(ctx context.Context) error { d.signalShutdown() return nil } func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) { return taskConfigSpec, nil } func (d *Driver) Capabilities() (*drivers.Capabilities, error) { return capabilities, nil } func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) { ch := make(chan *drivers.Fingerprint) go d.handleFingerprint(ctx, ch) return ch, nil } func (d *Driver) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) { defer close(ch) ticker := time.NewTimer(0) for { select { case <-ctx.Done(): return case <-d.ctx.Done(): return case <-ticker.C: ticker.Reset(fingerprintPeriod) ch <- d.buildFingerprint() } } } func (d *Driver) buildFingerprint() *drivers.Fingerprint { var health drivers.HealthState var desc string attrs := map[string]*pstructs.Attribute{} lxcVersion := lxc.Version() if d.config.Enabled && lxcVersion != "" { health = drivers.HealthStateHealthy desc = "ready" attrs["driver.lxc"] = pstructs.NewBoolAttribute(true) attrs["driver.lxc.version"] = pstructs.NewStringAttribute(lxcVersion) } else { health = drivers.HealthStateUndetected desc = "disabled" } if d.config.AllowVolumes { attrs["driver.lxc.volumes.enabled"] = pstructs.NewBoolAttribute(true) } return &drivers.Fingerprint{ Attributes: attrs, Health: health, HealthDescription: desc, } } func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { if handle == nil { return fmt.Errorf("error: handle cannot be nil") } if _, ok := d.tasks.Get(handle.Config.ID); ok { return nil } var taskState TaskState if err := handle.GetDriverState(&taskState); err != nil { return fmt.Errorf("failed to decode task state from handle: %v", err) } var driverConfig TaskConfig if err := taskState.TaskConfig.DecodeDriverConfig(&driverConfig); err != nil { return fmt.Errorf("failed to decode driver config: %v", err) } c, err := lxc.NewContainer(taskState.ContainerName, d.lxcPath()) if err != nil { return fmt.Errorf("failed to create container ref: %v", err) } initPid := c.InitPid() h := &taskHandle{ container: c, initPid: initPid, taskConfig: taskState.TaskConfig, procState: drivers.TaskStateRunning, startedAt: taskState.StartedAt, exitResult: &drivers.ExitResult{}, totalCpuStats: stats.NewCpuStats(), userCpuStats: stats.NewCpuStats(), systemCpuStats: stats.NewCpuStats(), } d.tasks.Set(taskState.TaskConfig.ID, h) go h.run() return nil } func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstructs.DriverNetwork, error) { if _, ok := d.tasks.Get(cfg.ID); ok { return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID) } var driverConfig TaskConfig if err := cfg.DecodeDriverConfig(&driverConfig); err != nil { return nil, nil, fmt.Errorf("failed to decode driver config: %v", err) } d.logger.Info("starting lxc task", "driver_cfg", hclog.Fmt("%+v", driverConfig)) handle := drivers.NewTaskHandle(pluginName) handle.Config = cfg c, err := d.initializeContainer(cfg, driverConfig) if err != nil { return nil, nil, err } opt := toLXCCreateOptions(driverConfig) if err := c.Create(opt); err != nil { return nil, nil, fmt.Errorf("unable to create container: %v", err) } cleanup := func() { if err := c.Destroy(); err != nil { d.logger.Error("failed to clean up from an error in Start", "error", err) } } if err := d.configureContainerNetwork(c); err != nil { cleanup() return nil, nil, err } if err := d.mountVolumes(c, cfg, driverConfig); err != nil { cleanup() return nil, nil, err } if err := c.Start(); err != nil { cleanup() return nil, nil, fmt.Errorf("unable to start container: %v", err) } if err := d.setResourceLimits(c, cfg); err != nil { cleanup() return nil, nil, err } pid := c.InitPid() h := &taskHandle{ container: c, initPid: pid, taskConfig: cfg, procState: drivers.TaskStateRunning, startedAt: time.Now().Round(time.Millisecond), logger: d.logger, totalCpuStats: stats.NewCpuStats(), userCpuStats: stats.NewCpuStats(), systemCpuStats: stats.NewCpuStats(), } driverState := TaskState{ ContainerName: c.Name(), TaskConfig: cfg, StartedAt: h.startedAt, } if err := handle.SetDriverState(&driverState); err != nil { d.logger.Error("failed to start task, error setting driver state", "error", err) cleanup() return nil, nil, fmt.Errorf("failed to set driver state: %v", err) } d.tasks.Set(cfg.ID, h) go h.run() return handle, nil, nil } func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) { handle, ok := d.tasks.Get(taskID) if !ok { return nil, drivers.ErrTaskNotFound } ch := make(chan *drivers.ExitResult) go d.handleWait(ctx, handle, ch) return ch, nil } func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) { defer close(ch) // // Wait for process completion by polling status from handler. // We cannot use the following alternatives: // * Process.Wait() requires LXC container processes to be children // of self process; but LXC runs container in separate PID hierarchy // owned by PID 1. // * lxc.Container.Wait() holds a write lock on container and prevents // any other calls, including stats. // // Going with simplest approach of polling for handler to mark exit. ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-d.ctx.Done(): return case <-ticker.C: s := handle.TaskStatus() if s.State == drivers.TaskStateExited { ch <- handle.exitResult } } } } func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error { handle, ok := d.tasks.Get(taskID) if !ok { return drivers.ErrTaskNotFound } if err := handle.shutdown(timeout); err != nil { return fmt.Errorf("executor Shutdown failed: %v", err) } return nil } func (d *Driver) DestroyTask(taskID string, force bool) error { handle, ok := d.tasks.Get(taskID) if !ok { return drivers.ErrTaskNotFound } if handle.IsRunning() && !force { return fmt.Errorf("cannot destroy running task") } if handle.IsRunning() { // grace period is chosen arbitrary here if err := handle.shutdown(1 * time.Minute); err != nil { handle.logger.Error("failed to destroy executor", "err", err) } } d.tasks.Delete(taskID) return nil } func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) { handle, ok := d.tasks.Get(taskID) if !ok { return nil, drivers.ErrTaskNotFound } return handle.TaskStatus(), nil } func (d *Driver) TaskStats(taskID string) (*cstructs.TaskResourceUsage, error) { handle, ok := d.tasks.Get(taskID) if !ok { return nil, drivers.ErrTaskNotFound } return handle.stats() } func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) { return d.eventer.TaskEvents(ctx) } func (d *Driver) SignalTask(taskID string, signal string) error { return fmt.Errorf("LXC driver does not support signals") } func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) { return nil, fmt.Errorf("LXC driver does not support exec") }