open-nomad/drivers/lxc/driver.go
2018-11-28 10:09:10 -06:00

514 lines
14 KiB
Go

//+build linux,lxc
package lxc
import (
"context"
"fmt"
"strconv"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/stats"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/shared/eventer"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/plugins/shared/loader"
pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"gopkg.in/lxc/go-lxc.v2"
)
const (
// pluginName is the name of the plugin
pluginName = "lxc"
// fingerprintPeriod is the interval at which the driver will send fingerprint responses
fingerprintPeriod = 30 * time.Second
)
var (
// PluginID is the rawexec plugin metadata registered in the plugin
// catalog.
PluginID = loader.PluginID{
Name: pluginName,
PluginType: base.PluginTypeDriver,
}
// PluginConfig is the rawexec factory function registered in the
// plugin catalog.
PluginConfig = &loader.InternalPluginConfig{
Config: map[string]interface{}{},
Factory: func(l hclog.Logger) interface{} { return NewLXCDriver(l) },
}
)
// PluginLoader maps pre-0.9 client driver options to post-0.9 plugin options.
func PluginLoader(opts map[string]string) (map[string]interface{}, error) {
conf := map[string]interface{}{}
if v, err := strconv.ParseBool(opts["driver.lxc.enable"]); err == nil {
conf["enabled"] = v
}
if v, err := strconv.ParseBool(opts["lxc.volumes.enabled"]); err == nil {
conf["volumes"] = v
}
if v, ok := opts["driver.lxc.path"]; ok {
conf["path"] = v
}
return conf, nil
}
var (
// pluginInfo is the response returned for the PluginInfo RPC
pluginInfo = &base.PluginInfoResponse{
Type: base.PluginTypeDriver,
PluginApiVersion: "0.0.1",
PluginVersion: "0.1.0",
Name: pluginName,
}
// configSpec is the hcl specification returned by the ConfigSchema RPC
configSpec = hclspec.NewObject(map[string]*hclspec.Spec{
"enabled": hclspec.NewDefault(
hclspec.NewAttr("enabled", "bool", false),
hclspec.NewLiteral("true"),
),
"volumes": hclspec.NewDefault(
hclspec.NewAttr("volumes", "bool", false),
hclspec.NewLiteral("true"),
),
"path": hclspec.NewDefault(
hclspec.NewAttr("path", "string", false),
hclspec.NewLiteral("\"\""),
),
})
// taskConfigSpec is the hcl specification for the driver config section of
// a task within a job. It is returned in the TaskConfigSchema RPC
taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{
"template": hclspec.NewAttr("template", "string", true),
"distro": hclspec.NewAttr("distro", "string", false),
"release": hclspec.NewAttr("release", "string", false),
"arch": hclspec.NewAttr("arch", "string", false),
"image_variant": hclspec.NewAttr("image_variant", "string", false),
"image_server": hclspec.NewAttr("image_server", "string", false),
"gpg_key_id": hclspec.NewAttr("gpg_key_id", "string", false),
"gpg_key_server": hclspec.NewAttr("gpg_key_server", "string", false),
"disable_gpg": hclspec.NewAttr("disable_gpg", "string", false),
"flush_cache": hclspec.NewAttr("flush_cache", "string", false),
"force_cache": hclspec.NewAttr("force_cache", "string", false),
"template_args": hclspec.NewAttr("template_args", "list(string)", false),
"log_level": hclspec.NewAttr("log_level", "string", false),
"verbosity": hclspec.NewAttr("verbosity", "string", false),
"volumes": hclspec.NewAttr("volumes", "list(string)", false),
})
// capabilities is returned by the Capabilities RPC and indicates what
// optional features this driver supports
capabilities = &drivers.Capabilities{
SendSignals: false,
Exec: false,
FSIsolation: cstructs.FSIsolationImage,
}
)
// Driver is a driver for running LXC containers
type Driver struct {
// eventer is used to handle multiplexing of TaskEvents calls such that an
// event can be broadcast to all callers
eventer *eventer.Eventer
// config is the driver configuration set by the SetConfig RPC
config *Config
// nomadConfig is the client config from nomad
nomadConfig *base.ClientDriverConfig
// tasks is the in memory datastore mapping taskIDs to rawExecDriverHandles
tasks *taskStore
// ctx is the context for the driver. It is passed to other subsystems to
// coordinate shutdown
ctx context.Context
// signalShutdown is called when the driver is shutting down and cancels the
// ctx passed to any subsystems
signalShutdown context.CancelFunc
// logger will log to the Nomad agent
logger hclog.Logger
}
// Config is the driver configuration set by the SetConfig RPC call
type Config struct {
// Enabled is set to true to enable the lxc driver
Enabled bool `codec:"enabled"`
AllowVolumes bool `codec:"volumes"`
Path string `codec:"path"`
}
// TaskConfig is the driver configuration of a task within a job
type TaskConfig struct {
Template string `codec:"template"`
Distro string `codec:"distro"`
Release string `codec:"release"`
Arch string `codec:"arch"`
ImageVariant string `codec:"image_variant"`
ImageServer string `codec:"image_server"`
GPGKeyID string `codec:"gpg_key_id"`
GPGKeyServer string `codec:"gpg_key_server"`
DisableGPGValidation bool `codec:"disable_gpg"`
FlushCache bool `codec:"flush_cache"`
ForceCache bool `codec:"force_cache"`
TemplateArgs []string `codec:"template_args"`
LogLevel string `codec:"log_level"`
Verbosity string `codec:"verbosity"`
Volumes []string `codec:"volumes"`
}
// TaskState is the state which is encoded in the handle returned in
// StartTask. This information is needed to rebuild the task state and handler
// during recovery.
type TaskState struct {
TaskConfig *drivers.TaskConfig
ContainerName string
StartedAt time.Time
}
// NewLXCDriver returns a new DriverPlugin implementation
func NewLXCDriver(logger hclog.Logger) drivers.DriverPlugin {
ctx, cancel := context.WithCancel(context.Background())
logger = logger.Named(pluginName)
return &Driver{
eventer: eventer.NewEventer(ctx, logger),
config: &Config{},
tasks: newTaskStore(),
ctx: ctx,
signalShutdown: cancel,
logger: logger,
}
}
func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) {
return pluginInfo, nil
}
func (d *Driver) ConfigSchema() (*hclspec.Spec, error) {
return configSpec, nil
}
func (d *Driver) SetConfig(data []byte, cfg *base.ClientAgentConfig) error {
var config Config
if err := base.MsgPackDecode(data, &config); err != nil {
return err
}
d.config = &config
if cfg != nil {
d.nomadConfig = cfg.Driver
}
return nil
}
func (d *Driver) Shutdown(ctx context.Context) error {
d.signalShutdown()
return nil
}
func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) {
return taskConfigSpec, nil
}
func (d *Driver) Capabilities() (*drivers.Capabilities, error) {
return capabilities, nil
}
func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) {
ch := make(chan *drivers.Fingerprint)
go d.handleFingerprint(ctx, ch)
return ch, nil
}
func (d *Driver) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) {
defer close(ch)
ticker := time.NewTimer(0)
for {
select {
case <-ctx.Done():
return
case <-d.ctx.Done():
return
case <-ticker.C:
ticker.Reset(fingerprintPeriod)
ch <- d.buildFingerprint()
}
}
}
func (d *Driver) buildFingerprint() *drivers.Fingerprint {
var health drivers.HealthState
var desc string
attrs := map[string]*pstructs.Attribute{}
lxcVersion := lxc.Version()
if d.config.Enabled && lxcVersion != "" {
health = drivers.HealthStateHealthy
desc = "ready"
attrs["driver.lxc"] = pstructs.NewBoolAttribute(true)
attrs["driver.lxc.version"] = pstructs.NewStringAttribute(lxcVersion)
} else {
health = drivers.HealthStateUndetected
desc = "disabled"
}
if d.config.AllowVolumes {
attrs["driver.lxc.volumes.enabled"] = pstructs.NewBoolAttribute(true)
}
return &drivers.Fingerprint{
Attributes: attrs,
Health: health,
HealthDescription: desc,
}
}
func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
if handle == nil {
return fmt.Errorf("error: handle cannot be nil")
}
if _, ok := d.tasks.Get(handle.Config.ID); ok {
return nil
}
var taskState TaskState
if err := handle.GetDriverState(&taskState); err != nil {
return fmt.Errorf("failed to decode task state from handle: %v", err)
}
var driverConfig TaskConfig
if err := taskState.TaskConfig.DecodeDriverConfig(&driverConfig); err != nil {
return fmt.Errorf("failed to decode driver config: %v", err)
}
c, err := lxc.NewContainer(taskState.ContainerName, d.lxcPath())
if err != nil {
return fmt.Errorf("failed to create container ref: %v", err)
}
initPid := c.InitPid()
h := &taskHandle{
container: c,
initPid: initPid,
taskConfig: taskState.TaskConfig,
procState: drivers.TaskStateRunning,
startedAt: taskState.StartedAt,
exitResult: &drivers.ExitResult{},
totalCpuStats: stats.NewCpuStats(),
userCpuStats: stats.NewCpuStats(),
systemCpuStats: stats.NewCpuStats(),
}
d.tasks.Set(taskState.TaskConfig.ID, h)
go h.run()
return nil
}
func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstructs.DriverNetwork, error) {
if _, ok := d.tasks.Get(cfg.ID); ok {
return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID)
}
var driverConfig TaskConfig
if err := cfg.DecodeDriverConfig(&driverConfig); err != nil {
return nil, nil, fmt.Errorf("failed to decode driver config: %v", err)
}
d.logger.Info("starting lxc task", "driver_cfg", hclog.Fmt("%+v", driverConfig))
handle := drivers.NewTaskHandle(pluginName)
handle.Config = cfg
c, err := d.initializeContainer(cfg, driverConfig)
if err != nil {
return nil, nil, err
}
opt := toLXCCreateOptions(driverConfig)
if err := c.Create(opt); err != nil {
return nil, nil, fmt.Errorf("unable to create container: %v", err)
}
cleanup := func() {
if err := c.Destroy(); err != nil {
d.logger.Error("failed to clean up from an error in Start", "error", err)
}
}
if err := d.configureContainerNetwork(c); err != nil {
cleanup()
return nil, nil, err
}
if err := d.mountVolumes(c, cfg, driverConfig); err != nil {
cleanup()
return nil, nil, err
}
if err := c.Start(); err != nil {
cleanup()
return nil, nil, fmt.Errorf("unable to start container: %v", err)
}
if err := d.setResourceLimits(c, cfg); err != nil {
cleanup()
return nil, nil, err
}
pid := c.InitPid()
h := &taskHandle{
container: c,
initPid: pid,
taskConfig: cfg,
procState: drivers.TaskStateRunning,
startedAt: time.Now().Round(time.Millisecond),
logger: d.logger,
totalCpuStats: stats.NewCpuStats(),
userCpuStats: stats.NewCpuStats(),
systemCpuStats: stats.NewCpuStats(),
}
driverState := TaskState{
ContainerName: c.Name(),
TaskConfig: cfg,
StartedAt: h.startedAt,
}
if err := handle.SetDriverState(&driverState); err != nil {
d.logger.Error("failed to start task, error setting driver state", "error", err)
cleanup()
return nil, nil, fmt.Errorf("failed to set driver state: %v", err)
}
d.tasks.Set(cfg.ID, h)
go h.run()
return handle, nil, nil
}
func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
ch := make(chan *drivers.ExitResult)
go d.handleWait(ctx, handle, ch)
return ch, nil
}
func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) {
defer close(ch)
//
// Wait for process completion by polling status from handler.
// We cannot use the following alternatives:
// * Process.Wait() requires LXC container processes to be children
// of self process; but LXC runs container in separate PID hierarchy
// owned by PID 1.
// * lxc.Container.Wait() holds a write lock on container and prevents
// any other calls, including stats.
//
// Going with simplest approach of polling for handler to mark exit.
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-d.ctx.Done():
return
case <-ticker.C:
s := handle.TaskStatus()
if s.State == drivers.TaskStateExited {
ch <- handle.exitResult
}
}
}
}
func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
if err := handle.shutdown(timeout); err != nil {
return fmt.Errorf("executor Shutdown failed: %v", err)
}
return nil
}
func (d *Driver) DestroyTask(taskID string, force bool) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
if handle.IsRunning() && !force {
return fmt.Errorf("cannot destroy running task")
}
if handle.IsRunning() {
// grace period is chosen arbitrary here
if err := handle.shutdown(1 * time.Minute); err != nil {
handle.logger.Error("failed to destroy executor", "err", err)
}
}
d.tasks.Delete(taskID)
return nil
}
func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
return handle.TaskStatus(), nil
}
func (d *Driver) TaskStats(taskID string) (*cstructs.TaskResourceUsage, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
return handle.stats()
}
func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
return d.eventer.TaskEvents(ctx)
}
func (d *Driver) SignalTask(taskID string, signal string) error {
return fmt.Errorf("LXC driver does not support signals")
}
func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) {
return nil, fmt.Errorf("LXC driver does not support exec")
}