From 3e3ca1ac0422470f5bbee95b5db8362312be66be Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 28 Aug 2018 10:30:57 -0700 Subject: [PATCH] Add stats to example plugin --- plugins/device/cmd/example/device.go | 260 ++++++++++++++++++++------- 1 file changed, 194 insertions(+), 66 deletions(-) diff --git a/plugins/device/cmd/example/device.go b/plugins/device/cmd/example/device.go index 59dbb5f30..8287d6373 100644 --- a/plugins/device/cmd/example/device.go +++ b/plugins/device/cmd/example/device.go @@ -4,7 +4,9 @@ import ( "context" "fmt" "io/ioutil" + "os" "path/filepath" + "sync" "time" log "github.com/hashicorp/go-hclog" @@ -19,7 +21,7 @@ import ( const ( // pluginName is the name of the plugin - pluginName = "example-device" + pluginName = "example-fs-device" // vendor is the vendor providing the devices vendor = "nomad" @@ -42,7 +44,10 @@ var ( // configSpec is the specification of the plugin's configuration configSpec = hclspec.NewObject(map[string]*hclspec.Spec{ - "dir": hclspec.NewAttr("dir", "string", true), + "dir": hclspec.NewDefault( + hclspec.NewAttr("dir", "string", false), + hclspec.NewLiteral("\".\""), + ), "list_period": hclspec.NewDefault( hclspec.NewAttr("list_period", "string", false), hclspec.NewLiteral("\"5s\""), @@ -51,6 +56,10 @@ var ( hclspec.NewAttr("unhealthy_perm", "string", false), hclspec.NewLiteral("\"-rwxrwxrwx\""), ), + "stats_period": hclspec.NewDefault( + hclspec.NewAttr("list_period", "string", false), + hclspec.NewLiteral("\"5s\""), + ), }) ) @@ -58,14 +67,15 @@ var ( type Config struct { Dir string `codec:"dir"` ListPeriod string `codec:"list_period"` + StatsPeriod string `codec:"stats_period"` UnhealthyPerm string `codec:"unhealthy_perm"` } -// ExampleDevice is an example device plugin. The device plugin exposes files as +// FsDevice is an example device plugin. The device plugin exposes files as // devices and periodically polls the directory for new files. If a file has a // given file permission, it is considered unhealthy. This device plugin is // purely for use as an example. -type ExampleDevice struct { +type FsDevice struct { logger log.Logger // deviceDir is the directory we expose as devices @@ -78,30 +88,35 @@ type ExampleDevice struct { // devices listPeriod time.Duration + // statsPeriod is how often we should collect statistics for fingerprinted + // devices. + statsPeriod time.Duration + // devices is the set of detected devices and maps whether they are healthy - devices map[string]bool + devices map[string]bool + deviceLock sync.RWMutex } // NewExampleDevice returns a new example device plugin. -func NewExampleDevice(log log.Logger) *ExampleDevice { - return &ExampleDevice{ +func NewExampleDevice(log log.Logger) *FsDevice { + return &FsDevice{ logger: log.Named(pluginName), devices: make(map[string]bool), } } // PluginInfo returns information describing the plugin. -func (d *ExampleDevice) PluginInfo() (*base.PluginInfoResponse, error) { +func (d *FsDevice) PluginInfo() (*base.PluginInfoResponse, error) { return pluginInfo, nil } // ConfigSchema returns the plugins configuration schema. -func (d *ExampleDevice) ConfigSchema() (*hclspec.Spec, error) { +func (d *FsDevice) ConfigSchema() (*hclspec.Spec, error) { return configSpec, nil } // SetConfig is used to set the configuration of the plugin. -func (d *ExampleDevice) SetConfig(data []byte) error { +func (d *FsDevice) SetConfig(data []byte) error { var config Config if err := base.MsgPackDecode(data, &config); err != nil { return err @@ -118,6 +133,13 @@ func (d *ExampleDevice) SetConfig(data []byte) error { } d.listPeriod = period + // Convert the stats period + speriod, err := time.ParseDuration(config.StatsPeriod) + if err != nil { + return fmt.Errorf("failed to parse list period %q: %v", config.StatsPeriod, err) + } + d.statsPeriod = speriod + d.logger.Debug("test debug") d.logger.Info("config set", "config", log.Fmt("% #v", pretty.Formatter(config))) return nil @@ -125,7 +147,7 @@ func (d *ExampleDevice) SetConfig(data []byte) error { // Fingerprint streams detected devices. If device changes are detected or the // devices health changes, messages will be emitted. -func (d *ExampleDevice) Fingerprint(ctx context.Context) (<-chan *device.FingerprintResponse, error) { +func (d *FsDevice) Fingerprint(ctx context.Context) (<-chan *device.FingerprintResponse, error) { if d.deviceDir == "" { return nil, status.New(codes.Internal, "device directory not set in config").Err() } @@ -136,15 +158,12 @@ func (d *ExampleDevice) Fingerprint(ctx context.Context) (<-chan *device.Fingerp } // fingerprint is the long running goroutine that detects hardware -func (d *ExampleDevice) fingerprint(ctx context.Context, devices chan *device.FingerprintResponse) { +func (d *FsDevice) fingerprint(ctx context.Context, devices chan *device.FingerprintResponse) { defer close(devices) // Create a timer that will fire immediately for the first detection ticker := time.NewTimer(0) - // Build an unhealthy message - unhealthyDesc := fmt.Sprintf("Device has bad permissions %q", d.unhealthyPerm) - for { select { case <-ctx.Done(): @@ -162,64 +181,79 @@ func (d *ExampleDevice) fingerprint(ctx context.Context, devices chan *device.Fi return } - var changes bool - fnames := make(map[string]struct{}) - for _, f := range files { - name := f.Name() - fnames[name] = struct{}{} - if f.IsDir() { - d.logger.Trace("skipping directory", "directory", name) - continue - } - - // Determine the health - perms := f.Mode().Perm().String() - healthy := perms != d.unhealthyPerm - d.logger.Trace("checking health", "file perm", perms, "unhealthy perms", d.unhealthyPerm, "healthy", healthy) - - // See if we alreay have the device - oldHealth, ok := d.devices[name] - if ok && oldHealth == healthy { - continue - } - - // Health has changed or we have a new object - changes = true - d.devices[name] = healthy - } - - for id := range d.devices { - if _, ok := fnames[id]; !ok { - delete(d.devices, id) - changes = true - } - } - - // Nothing to do - if !changes { + detected := d.diffFiles(files) + if len(detected) == 0 { continue } - // Build the devices - detected := make([]*device.Device, 0, len(d.devices)) - for name, healthy := range d.devices { - var desc string - if !healthy { - desc = unhealthyDesc - } - - detected = append(detected, &device.Device{ - ID: name, - Healthy: healthy, - HealthDesc: desc, - }) - } - devices <- device.NewFingerprint(getDeviceGroup(detected)) } } +func (d *FsDevice) diffFiles(files []os.FileInfo) []*device.Device { + d.deviceLock.Lock() + defer d.deviceLock.Unlock() + + // Build an unhealthy message + unhealthyDesc := fmt.Sprintf("Device has bad permissions %q", d.unhealthyPerm) + + var changes bool + fnames := make(map[string]struct{}) + for _, f := range files { + name := f.Name() + fnames[name] = struct{}{} + if f.IsDir() { + d.logger.Trace("skipping directory", "directory", name) + continue + } + + // Determine the health + perms := f.Mode().Perm().String() + healthy := perms != d.unhealthyPerm + d.logger.Trace("checking health", "file perm", perms, "unhealthy perms", d.unhealthyPerm, "healthy", healthy) + + // See if we alreay have the device + oldHealth, ok := d.devices[name] + if ok && oldHealth == healthy { + continue + } + + // Health has changed or we have a new object + changes = true + d.devices[name] = healthy + } + + for id := range d.devices { + if _, ok := fnames[id]; !ok { + delete(d.devices, id) + changes = true + } + } + + // Nothing to do + if !changes { + return nil + } + + // Build the devices + detected := make([]*device.Device, 0, len(d.devices)) + for name, healthy := range d.devices { + var desc string + if !healthy { + desc = unhealthyDesc + } + + detected = append(detected, &device.Device{ + ID: name, + Healthy: healthy, + HealthDesc: desc, + }) + } + + return detected +} + // getDeviceGroup is a helper to build the DeviceGroup given a set of devices. func getDeviceGroup(devices []*device.Device) *device.DeviceGroup { return &device.DeviceGroup{ @@ -231,7 +265,7 @@ func getDeviceGroup(devices []*device.Device) *device.DeviceGroup { } // Reserve returns information on how to mount the given devices. -func (d *ExampleDevice) Reserve(deviceIDs []string) (*device.ContainerReservation, error) { +func (d *FsDevice) Reserve(deviceIDs []string) (*device.ContainerReservation, error) { if len(deviceIDs) == 0 { return nil, status.New(codes.InvalidArgument, "no device ids given").Err() } @@ -254,3 +288,97 @@ func (d *ExampleDevice) Reserve(deviceIDs []string) (*device.ContainerReservatio return resp, nil } + +// Stats streams statistics for the detected devices. +func (d *FsDevice) Stats(ctx context.Context) (<-chan *device.StatsResponse, error) { + outCh := make(chan *device.StatsResponse) + go d.stats(ctx, outCh) + return outCh, nil +} + +// stats is the long running goroutine that streams device statistics +func (d *FsDevice) stats(ctx context.Context, stats chan *device.StatsResponse) { + defer close(stats) + + // Create a timer that will fire immediately for the first detection + ticker := time.NewTimer(0) + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + ticker.Reset(d.listPeriod) + } + + deviceStats, err := d.collectStats() + if err != nil { + stats <- &device.StatsResponse{ + Error: err, + } + return + } + if deviceStats == nil { + continue + } + + stats <- &device.StatsResponse{ + Groups: []*device.DeviceGroupStats{deviceStats}, + } + } +} + +func (d *FsDevice) collectStats() (*device.DeviceGroupStats, error) { + d.deviceLock.RLock() + defer d.deviceLock.RUnlock() + l := len(d.devices) + if l == 0 { + return nil, nil + } + + now := time.Now() + group := &device.DeviceGroupStats{ + Vendor: vendor, + Type: deviceType, + Name: deviceName, + InstanceStats: make(map[string]*device.DeviceStats, l), + } + + for k := range d.devices { + p := filepath.Join(d.deviceDir, k) + f, err := os.Stat(p) + if err != nil { + return nil, fmt.Errorf("failed to stat %q: %v", p, err) + } + + s := &device.DeviceStats{ + Summary: &device.StatValue{ + IntNumeratorVal: f.Size(), + Unit: "bytes", + Desc: "Filesize in bytes", + }, + Stats: &device.StatObject{ + Attributes: map[string]*device.StatValue{ + "size": { + IntNumeratorVal: f.Size(), + Unit: "bytes", + Desc: "Filesize in bytes", + }, + "modify_time": { + StringVal: f.ModTime().String(), + Desc: "Last modified", + }, + "mode": { + StringVal: f.Mode().String(), + Desc: "File mode", + }, + }, + }, + Timestamp: now, + } + + group.InstanceStats[k] = s + } + + return group, nil +}