//+build linux,lxc package driver import ( "context" "encoding/json" "fmt" "log" "os" "path/filepath" "strconv" "strings" "syscall" "time" "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/helper/fields" "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/mapstructure" dstructs "github.com/hashicorp/nomad/client/driver/structs" cstructs "github.com/hashicorp/nomad/client/structs" lxc "gopkg.in/lxc/go-lxc.v2" ) const ( // lxcConfigOption is the key for enabling the LXC driver in the // Config.Options map. lxcConfigOption = "driver.lxc.enable" // lxcVolumesConfigOption is the key for enabling the use of // custom bind volumes to arbitrary host paths lxcVolumesConfigOption = "lxc.volumes.enabled" lxcVolumesConfigDefault = true // containerMonitorIntv is the interval at which the driver checks if the // container is still alive containerMonitorIntv = 2 * time.Second ) var ( LXCMeasuredCpuStats = []string{"System Mode", "User Mode", "Percent"} LXCMeasuredMemStats = []string{"RSS", "Cache", "Swap", "Max Usage", "Kernel Usage", "Kernel Max Usage"} ) // Add the lxc driver to the list of builtin drivers func init() { BuiltinDrivers["lxc"] = NewLxcDriver } // LxcDriver allows users to run LXC Containers type LxcDriver struct { DriverContext fingerprint.StaticFingerprinter } // LxcDriverConfig is the configuration of the LXC Container type LxcDriverConfig struct { Template string Distro string Release string Arch string ImageVariant string `mapstructure:"image_variant"` ImageServer string `mapstructure:"image_server"` GPGKeyID string `mapstructure:"gpg_key_id"` GPGKeyServer string `mapstructure:"gpg_key_server"` DisableGPGValidation bool `mapstructure:"disable_gpg"` FlushCache bool `mapstructure:"flush_cache"` ForceCache bool `mapstructure:"force_cache"` TemplateArgs []string `mapstructure:"template_args"` LogLevel string `mapstructure:"log_level"` Verbosity string Volumes []string `mapstructure:"volumes"` } // NewLxcDriver returns a new instance of the LXC driver func NewLxcDriver(ctx *DriverContext) Driver { return &LxcDriver{DriverContext: *ctx} } // Validate validates the lxc driver configuration func (d *LxcDriver) Validate(config map[string]interface{}) error { fd := &fields.FieldData{ Raw: config, Schema: map[string]*fields.FieldSchema{ "template": { Type: fields.TypeString, Required: true, }, "distro": { Type: fields.TypeString, Required: false, }, "release": { Type: fields.TypeString, Required: false, }, "arch": { Type: fields.TypeString, Required: false, }, "image_variant": { Type: fields.TypeString, Required: false, }, "image_server": { Type: fields.TypeString, Required: false, }, "gpg_key_id": { Type: fields.TypeString, Required: false, }, "gpg_key_server": { Type: fields.TypeString, Required: false, }, "disable_gpg": { Type: fields.TypeString, Required: false, }, "flush_cache": { Type: fields.TypeString, Required: false, }, "force_cache": { Type: fields.TypeString, Required: false, }, "template_args": { Type: fields.TypeArray, Required: false, }, "log_level": { Type: fields.TypeString, Required: false, }, "verbosity": { Type: fields.TypeString, Required: false, }, "volumes": { Type: fields.TypeArray, Required: false, }, }, } if err := fd.Validate(); err != nil { return err } volumes, _ := fd.GetOk("volumes") for _, volDesc := range volumes.([]interface{}) { volStr := volDesc.(string) paths := strings.Split(volStr, ":") if len(paths) != 2 { return fmt.Errorf("invalid volume bind mount entry: '%s'", volStr) } if len(paths[0]) == 0 || len(paths[1]) == 0 { return fmt.Errorf("invalid volume bind mount entry: '%s'", volStr) } if paths[1][0] == '/' { return fmt.Errorf("unsupported absolute container mount point: '%s'", paths[1]) } } return nil } func (d *LxcDriver) Abilities() DriverAbilities { return DriverAbilities{ SendSignals: false, Exec: false, } } func (d *LxcDriver) FSIsolation() cstructs.FSIsolation { return cstructs.FSIsolationImage } // Fingerprint fingerprints the lxc driver configuration func (d *LxcDriver) Fingerprint(req *cstructs.FingerprintRequest, resp *cstructs.FingerprintResponse) error { cfg := req.Config enabled := cfg.ReadBoolDefault(lxcConfigOption, true) if !enabled && !cfg.DevMode { return nil } version := lxc.Version() if version == "" { return nil } resp.AddAttribute("driver.lxc.version", version) resp.AddAttribute("driver.lxc", "1") resp.Detected = true // Advertise if this node supports lxc volumes if d.config.ReadBoolDefault(lxcVolumesConfigOption, lxcVolumesConfigDefault) { resp.AddAttribute("driver."+lxcVolumesConfigOption, "1") } return nil } func (d *LxcDriver) Prestart(*ExecContext, *structs.Task) (*PrestartResponse, error) { return nil, nil } // Start starts the LXC Driver func (d *LxcDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse, error) { sresp, err, errCleanup := d.startWithCleanup(ctx, task) if err != nil { if cleanupErr := errCleanup(); cleanupErr != nil { d.logger.Printf("[ERR] error occurred while cleaning up from error in Start: %v", cleanupErr) } } return sresp, err } func (d *LxcDriver) startWithCleanup(ctx *ExecContext, task *structs.Task) (*StartResponse, error, func() error) { noCleanup := func() error { return nil } var driverConfig LxcDriverConfig if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil { return nil, err, noCleanup } lxcPath := lxc.DefaultConfigPath() if path := d.config.Read("driver.lxc.path"); path != "" { lxcPath = path } containerName := fmt.Sprintf("%s-%s", task.Name, d.DriverContext.allocID) c, err := lxc.NewContainer(containerName, lxcPath) if err != nil { return nil, fmt.Errorf("unable to initialize container: %v", err), noCleanup } var verbosity lxc.Verbosity switch driverConfig.Verbosity { case "verbose": verbosity = lxc.Verbose case "", "quiet": verbosity = lxc.Quiet default: return nil, fmt.Errorf("lxc driver config 'verbosity' can only be either quiet or verbose"), noCleanup } c.SetVerbosity(verbosity) var logLevel lxc.LogLevel switch driverConfig.LogLevel { case "trace": logLevel = lxc.TRACE case "debug": logLevel = lxc.DEBUG case "info": logLevel = lxc.INFO case "warn": logLevel = lxc.WARN case "", "error": logLevel = lxc.ERROR default: return nil, fmt.Errorf("lxc driver config 'log_level' can only be trace, debug, info, warn or error"), noCleanup } c.SetLogLevel(logLevel) logFile := filepath.Join(ctx.TaskDir.Dir, fmt.Sprintf("%v-lxc.log", task.Name)) c.SetLogFile(logFile) options := lxc.TemplateOptions{ Template: driverConfig.Template, Distro: driverConfig.Distro, Release: driverConfig.Release, Arch: driverConfig.Arch, FlushCache: driverConfig.FlushCache, DisableGPGValidation: driverConfig.DisableGPGValidation, ExtraArgs: driverConfig.TemplateArgs, } if err := c.Create(options); err != nil { return nil, fmt.Errorf("unable to create container: %v", err), noCleanup } // Set the network type to none if err := c.SetConfigItem("lxc.network.type", "none"); err != nil { return nil, fmt.Errorf("error setting network type configuration: %v", err), c.Destroy } // Bind mount the shared alloc dir and task local dir in the container mounts := []string{ fmt.Sprintf("%s local none rw,bind,create=dir", ctx.TaskDir.LocalDir), fmt.Sprintf("%s alloc none rw,bind,create=dir", ctx.TaskDir.SharedAllocDir), fmt.Sprintf("%s secrets none rw,bind,create=dir", ctx.TaskDir.SecretsDir), } volumesEnabled := d.config.ReadBoolDefault(lxcVolumesConfigOption, lxcVolumesConfigDefault) for _, volDesc := range driverConfig.Volumes { // the format was checked in Validate() paths := strings.Split(volDesc, ":") if filepath.IsAbs(paths[0]) { if !volumesEnabled { return nil, fmt.Errorf("absolute bind-mount volume in config but '%v' is false", lxcVolumesConfigOption), c.Destroy } } else { // Relative source paths are treated as relative to alloc dir paths[0] = filepath.Join(ctx.TaskDir.Dir, paths[0]) } mounts = append(mounts, fmt.Sprintf("%s %s none rw,bind,create=dir", paths[0], paths[1])) } for _, mnt := range mounts { if err := c.SetConfigItem("lxc.mount.entry", mnt); err != nil { return nil, fmt.Errorf("error setting bind mount %q error: %v", mnt, err), c.Destroy } } // Start the container if err := c.Start(); err != nil { return nil, fmt.Errorf("unable to start container: %v", err), c.Destroy } stopAndDestroyCleanup := func() error { if err := c.Stop(); err != nil { return err } return c.Destroy() } // Set the resource limits if err := c.SetMemoryLimit(lxc.ByteSize(task.Resources.MemoryMB) * lxc.MB); err != nil { return nil, fmt.Errorf("unable to set memory limits: %v", err), stopAndDestroyCleanup } if err := c.SetCgroupItem("cpu.shares", strconv.Itoa(task.Resources.CPU)); err != nil { return nil, fmt.Errorf("unable to set cpu shares: %v", err), stopAndDestroyCleanup } h := lxcDriverHandle{ container: c, initPid: c.InitPid(), lxcPath: lxcPath, logger: d.logger, killTimeout: GetKillTimeout(task.KillTimeout, d.DriverContext.config.MaxKillTimeout), maxKillTimeout: d.DriverContext.config.MaxKillTimeout, totalCpuStats: stats.NewCpuStats(), userCpuStats: stats.NewCpuStats(), systemCpuStats: stats.NewCpuStats(), waitCh: make(chan *dstructs.WaitResult, 1), doneCh: make(chan bool, 1), } go h.run() return &StartResponse{Handle: &h}, nil, noCleanup } func (d *LxcDriver) Cleanup(*ExecContext, *CreatedResources) error { return nil } // Open creates the driver to monitor an existing LXC container func (d *LxcDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) { pid := &lxcPID{} if err := json.Unmarshal([]byte(handleID), pid); err != nil { return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err) } var container *lxc.Container containers := lxc.Containers(pid.LxcPath) for _, c := range containers { if c.Name() == pid.ContainerName { container = &c break } } if container == nil { return nil, fmt.Errorf("container %v not found", pid.ContainerName) } handle := lxcDriverHandle{ container: container, initPid: container.InitPid(), lxcPath: pid.LxcPath, logger: d.logger, killTimeout: pid.KillTimeout, maxKillTimeout: d.DriverContext.config.MaxKillTimeout, totalCpuStats: stats.NewCpuStats(), userCpuStats: stats.NewCpuStats(), systemCpuStats: stats.NewCpuStats(), waitCh: make(chan *dstructs.WaitResult, 1), doneCh: make(chan bool, 1), } go handle.run() return &handle, nil } // lxcDriverHandle allows controlling the lifecycle of an lxc container type lxcDriverHandle struct { container *lxc.Container initPid int lxcPath string logger *log.Logger killTimeout time.Duration maxKillTimeout time.Duration totalCpuStats *stats.CpuStats userCpuStats *stats.CpuStats systemCpuStats *stats.CpuStats waitCh chan *dstructs.WaitResult doneCh chan bool } type lxcPID struct { ContainerName string InitPid int LxcPath string KillTimeout time.Duration } func (h *lxcDriverHandle) ID() string { pid := lxcPID{ ContainerName: h.container.Name(), InitPid: h.initPid, LxcPath: h.lxcPath, KillTimeout: h.killTimeout, } data, err := json.Marshal(pid) if err != nil { h.logger.Printf("[ERR] driver.lxc: failed to marshal lxc PID to JSON: %v", err) } return string(data) } func (h *lxcDriverHandle) WaitCh() chan *dstructs.WaitResult { return h.waitCh } func (h *lxcDriverHandle) Update(task *structs.Task) error { h.killTimeout = GetKillTimeout(task.KillTimeout, h.killTimeout) return nil } func (h *lxcDriverHandle) Exec(ctx context.Context, cmd string, args []string) ([]byte, int, error) { return nil, 0, fmt.Errorf("lxc driver cannot execute commands") } func (h *lxcDriverHandle) Kill() error { name := h.container.Name() h.logger.Printf("[INFO] driver.lxc: shutting down container %q", name) if err := h.container.Shutdown(h.killTimeout); err != nil { h.logger.Printf("[INFO] driver.lxc: shutting down container %q failed: %v", name, err) if err := h.container.Stop(); err != nil { h.logger.Printf("[ERR] driver.lxc: error stopping container %q: %v", name, err) } } close(h.doneCh) return nil } func (h *lxcDriverHandle) Signal(s os.Signal) error { return fmt.Errorf("LXC does not support signals") } func (d *lxcDriverHandle) Network() *cstructs.DriverNetwork { return nil } func (h *lxcDriverHandle) Stats() (*cstructs.TaskResourceUsage, error) { cpuStats, err := h.container.CPUStats() if err != nil { return nil, nil } total, err := h.container.CPUTime() if err != nil { return nil, nil } t := time.Now() // Get the cpu stats system := cpuStats["system"] user := cpuStats["user"] cs := &cstructs.CpuStats{ SystemMode: h.systemCpuStats.Percent(float64(system)), UserMode: h.systemCpuStats.Percent(float64(user)), Percent: h.totalCpuStats.Percent(float64(total)), TotalTicks: float64(user + system), Measured: LXCMeasuredCpuStats, } // Get the Memory Stats memData := map[string]uint64{ "rss": 0, "cache": 0, "swap": 0, } rawMemStats := h.container.CgroupItem("memory.stat") for _, rawMemStat := range rawMemStats { key, val, err := keysToVal(rawMemStat) if err != nil { h.logger.Printf("[ERR] driver.lxc: error getting stat for line %q", rawMemStat) continue } if _, ok := memData[key]; ok { memData[key] = val } } ms := &cstructs.MemoryStats{ RSS: memData["rss"], Cache: memData["cache"], Swap: memData["swap"], Measured: LXCMeasuredMemStats, } mu := h.container.CgroupItem("memory.max_usage_in_bytes") for _, rawMemMaxUsage := range mu { val, err := strconv.ParseUint(rawMemMaxUsage, 10, 64) if err != nil { h.logger.Printf("[ERR] driver.lxc: unable to get max memory usage: %v", err) continue } ms.MaxUsage = val } ku := h.container.CgroupItem("memory.kmem.usage_in_bytes") for _, rawKernelUsage := range ku { val, err := strconv.ParseUint(rawKernelUsage, 10, 64) if err != nil { h.logger.Printf("[ERR] driver.lxc: unable to get kernel memory usage: %v", err) continue } ms.KernelUsage = val } mku := h.container.CgroupItem("memory.kmem.max_usage_in_bytes") for _, rawMaxKernelUsage := range mku { val, err := strconv.ParseUint(rawMaxKernelUsage, 10, 64) if err != nil { h.logger.Printf("[ERR] driver.lxc: unable to get max kernel memory usage: %v", err) continue } ms.KernelMaxUsage = val } taskResUsage := cstructs.TaskResourceUsage{ ResourceUsage: &cstructs.ResourceUsage{ CpuStats: cs, MemoryStats: ms, }, Timestamp: t.UTC().UnixNano(), } return &taskResUsage, nil } func (h *lxcDriverHandle) run() { defer close(h.waitCh) timer := time.NewTimer(containerMonitorIntv) for { select { case <-timer.C: process, err := os.FindProcess(h.initPid) if err != nil { h.waitCh <- &dstructs.WaitResult{Err: err} return } if err := process.Signal(syscall.Signal(0)); err != nil { h.waitCh <- &dstructs.WaitResult{} return } timer.Reset(containerMonitorIntv) case <-h.doneCh: h.waitCh <- &dstructs.WaitResult{} return } } } func keysToVal(line string) (string, uint64, error) { tokens := strings.Split(line, " ") if len(tokens) != 2 { return "", 0, fmt.Errorf("line isn't a k/v pair") } key := tokens[0] val, err := strconv.ParseUint(tokens[1], 10, 64) return key, val, err }