2019-10-22 13:20:26 +00:00
|
|
|
package csimanager
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2020-01-08 12:47:07 +00:00
|
|
|
"sync"
|
2019-10-22 13:20:26 +00:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
|
|
"github.com/hashicorp/nomad/client/dynamicplugins"
|
|
|
|
"github.com/hashicorp/nomad/plugins/csi"
|
|
|
|
)
|
|
|
|
|
|
|
|
const managerFingerprintInterval = 30 * time.Second
|
|
|
|
|
|
|
|
// instanceManager is used to manage the fingerprinting and supervision of a
|
|
|
|
// single CSI Plugin.
|
|
|
|
type instanceManager struct {
|
|
|
|
info *dynamicplugins.PluginInfo
|
|
|
|
logger hclog.Logger
|
|
|
|
|
|
|
|
updater UpdateNodeCSIInfoFunc
|
|
|
|
|
|
|
|
shutdownCtx context.Context
|
|
|
|
shutdownCtxCancelFn context.CancelFunc
|
|
|
|
shutdownCh chan struct{}
|
|
|
|
|
2020-01-08 12:47:07 +00:00
|
|
|
// mountPoint is the root of the mount dir where plugin specific data may be
|
|
|
|
// stored and where mount points will be created
|
|
|
|
mountPoint string
|
|
|
|
|
|
|
|
fp *pluginFingerprinter
|
|
|
|
|
|
|
|
volumeManager *volumeManager
|
|
|
|
volumeManagerMu sync.RWMutex
|
|
|
|
volumeManagerSetupCh chan struct{}
|
|
|
|
volumeManagerSetup bool
|
2019-10-22 13:20:26 +00:00
|
|
|
|
|
|
|
client csi.CSIPlugin
|
|
|
|
}
|
|
|
|
|
|
|
|
func newInstanceManager(logger hclog.Logger, updater UpdateNodeCSIInfoFunc, p *dynamicplugins.PluginInfo) *instanceManager {
|
|
|
|
ctx, cancelFn := context.WithCancel(context.Background())
|
2020-01-08 12:47:07 +00:00
|
|
|
logger = logger.Named(p.Name)
|
2019-10-22 13:20:26 +00:00
|
|
|
return &instanceManager{
|
2020-01-08 12:47:07 +00:00
|
|
|
logger: logger,
|
2019-10-22 13:20:26 +00:00
|
|
|
info: p,
|
|
|
|
updater: updater,
|
|
|
|
|
2020-01-08 12:47:07 +00:00
|
|
|
fp: &pluginFingerprinter{
|
|
|
|
logger: logger.Named("fingerprinter"),
|
|
|
|
info: p,
|
|
|
|
fingerprintNode: p.Type == dynamicplugins.PluginTypeCSINode,
|
|
|
|
fingerprintController: p.Type == dynamicplugins.PluginTypeCSIController,
|
|
|
|
},
|
|
|
|
|
|
|
|
mountPoint: p.Options["MountPoint"],
|
|
|
|
|
|
|
|
volumeManagerSetupCh: make(chan struct{}),
|
2019-10-22 13:20:26 +00:00
|
|
|
|
|
|
|
shutdownCtx: ctx,
|
|
|
|
shutdownCtxCancelFn: cancelFn,
|
|
|
|
shutdownCh: make(chan struct{}),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i *instanceManager) run() {
|
2020-01-08 12:47:07 +00:00
|
|
|
c, err := csi.NewClient(i.info.ConnectionInfo.SocketPath, i.logger)
|
2019-10-22 13:20:26 +00:00
|
|
|
if err != nil {
|
|
|
|
i.logger.Error("failed to setup instance manager client", "error", err)
|
|
|
|
close(i.shutdownCh)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
i.client = c
|
2020-02-05 11:13:08 +00:00
|
|
|
i.fp.client = c
|
2019-10-22 13:20:26 +00:00
|
|
|
|
|
|
|
go i.runLoop()
|
|
|
|
}
|
|
|
|
|
2020-01-08 12:47:07 +00:00
|
|
|
// VolumeMounter returns the volume manager that is configured for the given plugin
|
|
|
|
// instance. If called before the volume manager has been setup, it will block until
|
|
|
|
// the volume manager is ready or the context is closed.
|
|
|
|
func (i *instanceManager) VolumeMounter(ctx context.Context) (VolumeMounter, error) {
|
|
|
|
var vm VolumeMounter
|
|
|
|
i.volumeManagerMu.RLock()
|
|
|
|
if i.volumeManagerSetup {
|
|
|
|
vm = i.volumeManager
|
|
|
|
}
|
|
|
|
i.volumeManagerMu.RUnlock()
|
|
|
|
|
|
|
|
if vm != nil {
|
|
|
|
return vm, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-i.volumeManagerSetupCh:
|
|
|
|
i.volumeManagerMu.RLock()
|
|
|
|
vm = i.volumeManager
|
|
|
|
i.volumeManagerMu.RUnlock()
|
|
|
|
return vm, nil
|
|
|
|
case <-ctx.Done():
|
|
|
|
return nil, ctx.Err()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-10-22 13:20:26 +00:00
|
|
|
func (i *instanceManager) requestCtxWithTimeout(timeout time.Duration) (context.Context, context.CancelFunc) {
|
|
|
|
return context.WithTimeout(i.shutdownCtx, timeout)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i *instanceManager) runLoop() {
|
|
|
|
timer := time.NewTimer(0)
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-i.shutdownCtx.Done():
|
|
|
|
if i.client != nil {
|
|
|
|
i.client.Close()
|
|
|
|
i.client = nil
|
|
|
|
}
|
|
|
|
close(i.shutdownCh)
|
|
|
|
return
|
|
|
|
case <-timer.C:
|
|
|
|
ctx, cancelFn := i.requestCtxWithTimeout(managerFingerprintInterval)
|
|
|
|
|
2020-01-08 12:47:07 +00:00
|
|
|
info := i.fp.fingerprint(ctx)
|
|
|
|
cancelFn()
|
|
|
|
i.updater(i.info.Name, info)
|
2019-10-22 13:20:26 +00:00
|
|
|
|
2020-01-08 12:47:07 +00:00
|
|
|
// TODO: refactor this lock into a faster, goroutine-local check
|
|
|
|
i.volumeManagerMu.RLock()
|
|
|
|
// When we've had a successful fingerprint, and the volume manager is not yet setup,
|
|
|
|
// and one is required (we're running a node plugin), then set one up now.
|
|
|
|
if i.fp.hadFirstSuccessfulFingerprint && !i.volumeManagerSetup && i.fp.fingerprintNode {
|
|
|
|
i.volumeManagerMu.RUnlock()
|
|
|
|
i.volumeManagerMu.Lock()
|
|
|
|
i.volumeManager = newVolumeManager(i.logger, i.client, i.mountPoint, info.NodeInfo.RequiresNodeStageVolume)
|
|
|
|
i.volumeManagerSetup = true
|
|
|
|
close(i.volumeManagerSetupCh)
|
|
|
|
i.volumeManagerMu.Unlock()
|
2019-10-22 13:20:26 +00:00
|
|
|
} else {
|
2020-01-08 12:47:07 +00:00
|
|
|
i.volumeManagerMu.RUnlock()
|
2019-10-22 13:20:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
timer.Reset(managerFingerprintInterval)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i *instanceManager) shutdown() {
|
|
|
|
i.shutdownCtxCancelFn()
|
|
|
|
<-i.shutdownCh
|
|
|
|
}
|