client: fix data races in config handling (#14139)

Before this change, Client had 2 copies of the config object: config and configCopy. There was no guidance around which to use where (other than configCopy's comment to pass it to alloc runners), both are shared among goroutines and mutated in data racy ways. At least at one point I think the idea was to have `config` be mutable and then grab a lock to overwrite `configCopy`'s pointer atomically. This would have allowed alloc runners to read their config copies in data race safe ways, but this isn't how the current implementation worked.

This change takes the following approach to safely handling configs in the client:

1. `Client.config` is the only copy of the config and all access must go through the `Client.configLock` mutex
2. Since the mutex *only protects the config pointer itself and not fields inside the Config struct:* all config mutation must be done on a *copy* of the config, and then Client's config pointer is overwritten while the mutex is acquired. Alloc runners and other goroutines with the old config pointer will not see config updates.
3. Deep copying is implemented on the Config struct to satisfy the previous approach. The TLS Keyloader is an exception because it has its own internal locking to support mutating in place. An unfortunate complication but one I couldn't find a way to untangle in a timely fashion.
4. To facilitate deep copying I made an *internally backward incompatible API change:* our `helper/funcs` used to turn containers (slices and maps) with 0 elements into nils. This probably saves a few memory allocations but makes it very easy to cause panics. Since my new config handling approach uses more copying, it became very difficult to ensure all code that used containers on configs could handle nils properly. Since this code has caused panics in the past, I fixed it: nil containers are copied as nil, but 0-element containers properly return a new 0-element container. No more "downgrading to nil!"
This commit is contained in:
Michael Schurter 2022-08-18 16:32:04 -07:00 committed by GitHub
parent 8dba52cee2
commit 3b57df33e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 799 additions and 497 deletions

View File

@ -81,7 +81,7 @@ func (c *Client) ResolveSecretToken(secretID string) (*structs.ACLToken, error)
func (c *Client) resolveTokenAndACL(secretID string) (*acl.ACL, *structs.ACLToken, error) {
// Fast-path if ACLs are disabled
if !c.config.ACLEnabled {
if !c.GetConfig().ACLEnabled {
return nil, nil, nil
}
defer metrics.MeasureSince([]string{"client", "acl", "resolve_token"}, time.Now())
@ -127,7 +127,7 @@ func (c *Client) resolveTokenValue(secretID string) (*structs.ACLToken, error) {
raw, ok := c.tokenCache.Get(secretID)
if ok {
cached := raw.(*cachedACLValue)
if cached.Age() <= c.config.ACLTokenTTL {
if cached.Age() <= c.GetConfig().ACLTokenTTL {
return cached.Token, nil
}
}
@ -179,7 +179,7 @@ func (c *Client) resolvePolicies(secretID string, policies []string) ([]*structs
// Check if the cached value is valid or expired
cached := raw.(*cachedACLValue)
if cached.Age() <= c.config.ACLPolicyTTL {
if cached.Age() <= c.GetConfig().ACLPolicyTTL {
out = append(out, cached.Policy)
} else {
expired = append(expired, cached.Policy)

View File

@ -42,7 +42,7 @@ func (a *Agent) Profile(args *structs.AgentPprofRequest, reply *structs.AgentPpr
}
// If ACLs are disabled, EnableDebug must be enabled
if aclObj == nil && !a.c.config.EnableDebug {
if aclObj == nil && !a.c.GetConfig().EnableDebug {
return structs.ErrPermissionDenied
}
@ -218,7 +218,7 @@ func (a *Agent) Host(args *structs.HostDataRequest, reply *structs.HostDataRespo
return err
}
if (aclObj != nil && !aclObj.AllowAgentRead()) ||
(aclObj == nil && !a.c.config.EnableDebug) {
(aclObj == nil && !a.c.GetConfig().EnableDebug) {
return structs.ErrPermissionDenied
}

View File

@ -171,15 +171,22 @@ type AllocRunner interface {
// are expected to register as a schedule-able node to the servers, and to
// run allocations as determined by the servers.
type Client struct {
config *config.Config
start time.Time
start time.Time
// stateDB is used to efficiently store client state.
stateDB state.StateDB
// configCopy is a copy that should be passed to alloc-runners.
configCopy *config.Config
configLock sync.RWMutex
// config must only be accessed with lock held. To update the config, use the
// Client.UpdateConfig() helper. If you need more fine grained control use
// the following pattern:
//
// c.configLock.Lock()
// newConfig := c.config.Copy()
// // <mutate newConfig>
// c.config = newConfig
// c.configLock.Unlock()
config *config.Config
configLock sync.Mutex
logger hclog.InterceptLogger
rpcLogger hclog.Logger
@ -433,14 +440,8 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
return nil, fmt.Errorf("node setup failed: %v", err)
}
// Store the config copy before restoring state but after it has been
// initialized.
c.configLock.Lock()
c.configCopy = c.config.Copy()
c.configLock.Unlock()
fingerprintManager := NewFingerprintManager(
c.configCopy.PluginSingletonLoader, c.GetConfig, c.configCopy.Node,
cfg.PluginSingletonLoader, c.GetConfig, cfg.Node,
c.shutdownCh, c.updateNodeFromFingerprint, c.logger)
c.pluginManagers = pluginmanager.New(c.logger)
@ -469,8 +470,8 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
// Setup the driver manager
driverConfig := &drivermanager.Config{
Logger: c.logger,
Loader: c.configCopy.PluginSingletonLoader,
PluginConfig: c.configCopy.NomadPluginConfig(),
Loader: cfg.PluginSingletonLoader,
PluginConfig: cfg.NomadPluginConfig(),
Updater: c.batchNodeUpdates.updateNodeFromDriver,
EventHandlerFactory: c.GetTaskEventHandler,
State: c.stateDB,
@ -484,10 +485,10 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
// Setup the device manager
devConfig := &devicemanager.Config{
Logger: c.logger,
Loader: c.configCopy.PluginSingletonLoader,
PluginConfig: c.configCopy.NomadPluginConfig(),
Loader: cfg.PluginSingletonLoader,
PluginConfig: cfg.NomadPluginConfig(),
Updater: c.batchNodeUpdates.updateNodeFromDevices,
StatsInterval: c.configCopy.StatsCollectionInterval,
StatsInterval: cfg.StatsCollectionInterval,
State: c.stateDB,
}
devManager := devicemanager.New(devConfig)
@ -513,7 +514,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
go c.heartbeatStop.watch()
// Add the stats collector
statsCollector := stats.NewHostStatsCollector(c.logger, c.config.AllocDir, c.devicemanager.AllStats)
statsCollector := stats.NewHostStatsCollector(c.logger, cfg.AllocDir, c.devicemanager.AllStats)
c.hostStatsCollector = statsCollector
// Add the garbage collector
@ -529,16 +530,14 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
go c.garbageCollector.Run()
// Set the preconfigured list of static servers
c.configLock.RLock()
if len(c.configCopy.Servers) > 0 {
if _, err := c.setServersImpl(c.configCopy.Servers, true); err != nil {
if len(cfg.Servers) > 0 {
if _, err := c.setServersImpl(cfg.Servers, true); err != nil {
logger.Warn("none of the configured servers are valid", "error", err)
}
}
c.configLock.RUnlock()
// Setup Consul discovery if enabled
if c.configCopy.ConsulConfig.ClientAutoJoin != nil && *c.configCopy.ConsulConfig.ClientAutoJoin {
if cfg.ConsulConfig.ClientAutoJoin != nil && *cfg.ConsulConfig.ClientAutoJoin {
c.shutdownGroup.Go(c.consulDiscovery)
if c.servers.NumServers() == 0 {
// No configured servers; trigger discovery manually
@ -572,7 +571,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
"The safest way to proceed is to manually stop running task processes "+
"and remove Nomad's state and alloc directories before "+
"restarting. Lost allocations will be rescheduled.",
"state_dir", c.config.StateDir, "alloc_dir", c.config.AllocDir)
"state_dir", cfg.StateDir, "alloc_dir", cfg.AllocDir)
logger.Error("Corrupt state is often caused by a bug. Please " +
"report as much information as possible to " +
"https://github.com/hashicorp/nomad/issues")
@ -606,8 +605,9 @@ func (c *Client) Ready() <-chan struct{} {
// needed before we begin starting its various components.
func (c *Client) init() error {
// Ensure the state dir exists if we have one
if c.config.StateDir != "" {
if err := os.MkdirAll(c.config.StateDir, 0700); err != nil {
conf := c.GetConfig()
if conf.StateDir != "" {
if err := os.MkdirAll(conf.StateDir, 0700); err != nil {
return fmt.Errorf("failed creating state dir: %s", err)
}
@ -623,12 +623,14 @@ func (c *Client) init() error {
return fmt.Errorf("failed to find temporary directory for the StateDir: %v", err)
}
c.config.StateDir = p
conf = c.UpdateConfig(func(c *config.Config) {
c.StateDir = p
})
}
c.logger.Info("using state directory", "state_dir", c.config.StateDir)
c.logger.Info("using state directory", "state_dir", conf.StateDir)
// Open the state database
db, err := c.config.StateDBFactory(c.logger, c.config.StateDir)
db, err := conf.StateDBFactory(c.logger, conf.StateDir)
if err != nil {
return fmt.Errorf("failed to open state database: %v", err)
}
@ -646,8 +648,8 @@ func (c *Client) init() error {
c.stateDB = db
// Ensure the alloc dir exists if we have one
if c.config.AllocDir != "" {
if err := os.MkdirAll(c.config.AllocDir, 0711); err != nil {
if conf.AllocDir != "" {
if err := os.MkdirAll(conf.AllocDir, 0711); err != nil {
return fmt.Errorf("failed creating alloc dir: %s", err)
}
} else {
@ -667,32 +669,34 @@ func (c *Client) init() error {
return fmt.Errorf("failed to change directory permissions for the AllocDir: %v", err)
}
c.config.AllocDir = p
conf = c.UpdateConfig(func(c *config.Config) {
c.AllocDir = p
})
}
c.logger.Info("using alloc directory", "alloc_dir", c.config.AllocDir)
c.logger.Info("using alloc directory", "alloc_dir", conf.AllocDir)
reserved := "<none>"
if c.config.Node != nil && c.config.Node.ReservedResources != nil {
if conf.Node != nil && conf.Node.ReservedResources != nil {
// Node should always be non-nil due to initialization in the
// agent package, but don't risk a panic just for a long line.
reserved = c.config.Node.ReservedResources.Networks.ReservedHostPorts
reserved = conf.Node.ReservedResources.Networks.ReservedHostPorts
}
c.logger.Info("using dynamic ports",
"min", c.config.MinDynamicPort,
"max", c.config.MaxDynamicPort,
"min", conf.MinDynamicPort,
"max", conf.MaxDynamicPort,
"reserved", reserved,
)
// Ensure cgroups are created on linux platform
if runtime.GOOS == "linux" && c.cpusetManager != nil {
// use the client configuration for reservable_cores if set
cores := c.config.ReservableCores
cores := conf.ReservableCores
if len(cores) == 0 {
// otherwise lookup the effective cores from the parent cgroup
cores, err = cgutil.GetCPUsFromCgroup(c.config.CgroupParent)
cores, err = cgutil.GetCPUsFromCgroup(conf.CgroupParent)
if err != nil {
c.logger.Warn("failed to lookup cpuset from cgroup parent, and not set as reservable_cores", "parent", c.config.CgroupParent)
c.logger.Warn("failed to lookup cpuset from cgroup parent, and not set as reservable_cores", "parent", conf.CgroupParent)
// will continue with a disabled cpuset manager
}
}
@ -733,9 +737,9 @@ func (c *Client) reloadTLSConnections(newConfig *nconfig.TLSConfig) error {
// Keep the client configuration up to date as we use configuration values to
// decide on what type of connections to accept
c.configLock.Lock()
c.config.TLSConfig = newConfig
c.configLock.Unlock()
c.UpdateConfig(func(c *config.Config) {
c.TLSConfig = newConfig
})
c.connPool.ReloadTLS(tlsWrap)
@ -744,7 +748,8 @@ func (c *Client) reloadTLSConnections(newConfig *nconfig.TLSConfig) error {
// Reload allows a client to reload its configuration on the fly
func (c *Client) Reload(newConfig *config.Config) error {
shouldReloadTLS, err := tlsutil.ShouldReloadRPCConnections(c.config.TLSConfig, newConfig.TLSConfig)
existing := c.GetConfig()
shouldReloadTLS, err := tlsutil.ShouldReloadRPCConnections(existing.TLSConfig, newConfig.TLSConfig)
if err != nil {
c.logger.Error("error parsing TLS configuration", "error", err)
return err
@ -763,31 +768,50 @@ func (c *Client) Leave() error {
return nil
}
// GetConfig returns the config of the client
// GetConfig returns the config of the client. Do *not* mutate without first
// calling Copy().
func (c *Client) GetConfig() *config.Config {
c.configLock.Lock()
defer c.configLock.Unlock()
return c.configCopy
return c.config
}
// UpdateConfig allows mutating the configuration. The updated configuration is
// returned.
func (c *Client) UpdateConfig(cb func(*config.Config)) *config.Config {
c.configLock.Lock()
defer c.configLock.Unlock()
// Create a copy of the active config
newConfig := c.config.Copy()
// Pass the copy to the supplied callback for mutation
cb(newConfig)
// Set new config struct
c.config = newConfig
return newConfig
}
// Datacenter returns the datacenter for the given client
func (c *Client) Datacenter() string {
return c.config.Node.Datacenter
return c.GetConfig().Node.Datacenter
}
// Region returns the region for the given client
func (c *Client) Region() string {
return c.config.Region
return c.GetConfig().Region
}
// NodeID returns the node ID for the given client
func (c *Client) NodeID() string {
return c.config.Node.ID
return c.GetConfig().Node.ID
}
// secretNodeID returns the secret node ID for the given client
func (c *Client) secretNodeID() string {
return c.config.Node.SecretID
return c.GetConfig().Node.SecretID
}
// Shutdown is used to tear down the client
@ -810,7 +834,7 @@ func (c *Client) Shutdown() error {
c.garbageCollector.Stop()
arGroup := group{}
if c.config.DevMode {
if c.GetConfig().DevMode {
// In DevMode destroy all the running allocations.
for _, ar := range c.getAllocRunners() {
ar.Destroy()
@ -919,9 +943,7 @@ func (c *Client) RestartAllocation(allocID, taskName string) error {
// Node returns the locally registered node
func (c *Client) Node() *structs.Node {
c.configLock.RLock()
defer c.configLock.RUnlock()
return c.configCopy.Node
return c.GetConfig().Node
}
// getAllocRunner returns an AllocRunner or an UnknownAllocation error if the
@ -1017,11 +1039,12 @@ func (c *Client) computeAllocatedDeviceGroupStats(devices []*structs.AllocatedDe
// allocation, and has been created by a trusted party that has privileged
// knowledge of the client's secret identifier
func (c *Client) ValidateMigrateToken(allocID, migrateToken string) bool {
if !c.config.ACLEnabled {
conf := c.GetConfig()
if !conf.ACLEnabled {
return true
}
return structs.CompareMigrateToken(allocID, c.secretNodeID(), migrateToken)
return structs.CompareMigrateToken(allocID, conf.Node.SecretID, migrateToken)
}
// GetAllocFS returns the AllocFS interface for the alloc dir of an allocation
@ -1124,7 +1147,8 @@ func (c *Client) setServersImpl(in []string, force bool) (int, error) {
// If there are errors restoring a specific allocation it is marked
// as failed whenever possible.
func (c *Client) restoreState() error {
if c.config.DevMode {
conf := c.GetConfig()
if conf.DevMode {
return nil
}
@ -1168,11 +1192,10 @@ func (c *Client) restoreState() error {
prevAllocWatcher := allocwatcher.NoopPrevAlloc{}
prevAllocMigrator := allocwatcher.NoopPrevAlloc{}
c.configLock.RLock()
arConf := &allocrunner.Config{
Alloc: alloc,
Logger: c.logger,
ClientConfig: c.configCopy,
ClientConfig: conf,
StateDB: c.stateDB,
StateUpdater: c,
DeviceStatsReporter: c,
@ -1193,7 +1216,6 @@ func (c *Client) restoreState() error {
RPCClient: c,
Getter: c.getter,
}
c.configLock.RUnlock()
ar, err := allocrunner.NewAllocRunner(arConf)
if err != nil {
@ -1337,13 +1359,13 @@ func (c *Client) NumAllocs() int {
return n
}
// nodeID restores, or generates if necessary, a unique node ID and SecretID.
// The node ID is, if available, a persistent unique ID. The secret ID is a
// high-entropy random UUID.
func (c *Client) nodeID() (id, secret string, err error) {
// ensureNodeID restores, or generates if necessary, a unique node ID and
// SecretID. The node ID is, if available, a persistent unique ID. The secret
// ID is a high-entropy random UUID.
func ensureNodeID(conf *config.Config) (id, secret string, err error) {
var hostID string
hostInfo, err := host.Info()
if !c.config.NoHostUUID && err == nil {
if !conf.NoHostUUID && err == nil {
if hashed, ok := helper.HashUUID(hostInfo.HostID); ok {
hostID = hashed
}
@ -1356,19 +1378,19 @@ func (c *Client) nodeID() (id, secret string, err error) {
}
// Do not persist in dev mode
if c.config.DevMode {
if conf.DevMode {
return hostID, uuid.Generate(), nil
}
// Attempt to read existing ID
idPath := filepath.Join(c.config.StateDir, "client-id")
idPath := filepath.Join(conf.StateDir, "client-id")
idBuf, err := ioutil.ReadFile(idPath)
if err != nil && !os.IsNotExist(err) {
return "", "", err
}
// Attempt to read existing secret ID
secretPath := filepath.Join(c.config.StateDir, "secret-id")
secretPath := filepath.Join(conf.StateDir, "secret-id")
secretBuf, err := ioutil.ReadFile(secretPath)
if err != nil && !os.IsNotExist(err) {
return "", "", err
@ -1403,13 +1425,18 @@ func (c *Client) nodeID() (id, secret string, err error) {
// setupNode is used to setup the initial node
func (c *Client) setupNode() error {
node := c.config.Node
c.configLock.Lock()
defer c.configLock.Unlock()
newConfig := c.config.Copy()
node := newConfig.Node
if node == nil {
node = &structs.Node{}
c.config.Node = node
newConfig.Node = node
}
// Generate an ID and secret for the node
id, secretID, err := c.nodeID()
id, secretID, err := ensureNodeID(newConfig)
if err != nil {
return fmt.Errorf("node ID setup failed: %v", err)
}
@ -1436,8 +1463,8 @@ func (c *Client) setupNode() error {
}
if node.NodeResources == nil {
node.NodeResources = &structs.NodeResources{}
node.NodeResources.MinDynamicPort = c.config.MinDynamicPort
node.NodeResources.MaxDynamicPort = c.config.MaxDynamicPort
node.NodeResources.MinDynamicPort = newConfig.MinDynamicPort
node.NodeResources.MaxDynamicPort = newConfig.MaxDynamicPort
}
if node.ReservedResources == nil {
node.ReservedResources = &structs.NodeReservedResources{}
@ -1454,11 +1481,11 @@ func (c *Client) setupNode() error {
if node.Name == "" {
node.Name, _ = os.Hostname()
}
node.CgroupParent = c.config.CgroupParent
node.CgroupParent = newConfig.CgroupParent
if node.HostVolumes == nil {
if l := len(c.config.HostVolumes); l != 0 {
if l := len(newConfig.HostVolumes); l != 0 {
node.HostVolumes = make(map[string]*structs.ClientHostVolumeConfig, l)
for k, v := range c.config.HostVolumes {
for k, v := range newConfig.HostVolumes {
if _, err := os.Stat(v.Path); err != nil {
return fmt.Errorf("failed to validate volume %s, err: %v", v.Name, err)
}
@ -1467,9 +1494,9 @@ func (c *Client) setupNode() error {
}
}
if node.HostNetworks == nil {
if l := len(c.config.HostNetworks); l != 0 {
if l := len(newConfig.HostNetworks); l != 0 {
node.HostNetworks = make(map[string]*structs.ClientHostNetworkConfig, l)
for k, v := range c.config.HostNetworks {
for k, v := range newConfig.HostNetworks {
node.HostNetworks[k] = v.Copy()
}
}
@ -1494,6 +1521,7 @@ func (c *Client) setupNode() error {
node.Meta["connect.proxy_concurrency"] = defaultConnectProxyConcurrency
}
c.config = newConfig
return nil
}
@ -1504,34 +1532,35 @@ func (c *Client) updateNodeFromFingerprint(response *fingerprint.FingerprintResp
defer c.configLock.Unlock()
nodeHasChanged := false
newConfig := c.config.Copy()
for name, newVal := range response.Attributes {
oldVal := c.config.Node.Attributes[name]
oldVal := newConfig.Node.Attributes[name]
if oldVal == newVal {
continue
}
nodeHasChanged = true
if newVal == "" {
delete(c.config.Node.Attributes, name)
delete(newConfig.Node.Attributes, name)
} else {
c.config.Node.Attributes[name] = newVal
newConfig.Node.Attributes[name] = newVal
}
}
// update node links and resources from the diff created from
// fingerprinting
for name, newVal := range response.Links {
oldVal := c.config.Node.Links[name]
oldVal := newConfig.Node.Links[name]
if oldVal == newVal {
continue
}
nodeHasChanged = true
if newVal == "" {
delete(c.config.Node.Links, name)
delete(newConfig.Node.Links, name)
} else {
c.config.Node.Links[name] = newVal
newConfig.Node.Links[name] = newVal
}
}
@ -1541,9 +1570,9 @@ func (c *Client) updateNodeFromFingerprint(response *fingerprint.FingerprintResp
if response.Resources != nil {
response.Resources.Networks = updateNetworks(
response.Resources.Networks,
c.config)
if !c.config.Node.Resources.Equals(response.Resources) {
c.config.Node.Resources.Merge(response.Resources)
newConfig)
if !newConfig.Node.Resources.Equals(response.Resources) {
newConfig.Node.Resources.Merge(response.Resources)
nodeHasChanged = true
}
}
@ -1553,26 +1582,27 @@ func (c *Client) updateNodeFromFingerprint(response *fingerprint.FingerprintResp
if response.NodeResources != nil {
response.NodeResources.Networks = updateNetworks(
response.NodeResources.Networks,
c.config)
if !c.config.Node.NodeResources.Equals(response.NodeResources) {
c.config.Node.NodeResources.Merge(response.NodeResources)
newConfig)
if !newConfig.Node.NodeResources.Equals(response.NodeResources) {
newConfig.Node.NodeResources.Merge(response.NodeResources)
nodeHasChanged = true
}
response.NodeResources.MinDynamicPort = c.config.MinDynamicPort
response.NodeResources.MaxDynamicPort = c.config.MaxDynamicPort
if c.config.Node.NodeResources.MinDynamicPort != response.NodeResources.MinDynamicPort ||
c.config.Node.NodeResources.MaxDynamicPort != response.NodeResources.MaxDynamicPort {
response.NodeResources.MinDynamicPort = newConfig.MinDynamicPort
response.NodeResources.MaxDynamicPort = newConfig.MaxDynamicPort
if newConfig.Node.NodeResources.MinDynamicPort != response.NodeResources.MinDynamicPort ||
newConfig.Node.NodeResources.MaxDynamicPort != response.NodeResources.MaxDynamicPort {
nodeHasChanged = true
}
}
if nodeHasChanged {
c.updateNodeLocked()
c.config = newConfig
c.updateNode()
}
return c.configCopy.Node
return newConfig.Node
}
// updateNetworks filters and overrides network speed of host networks based
@ -1613,7 +1643,7 @@ func updateNetworks(up structs.Networks, c *config.Config) structs.Networks {
// retryIntv calculates a retry interval value given the base
func (c *Client) retryIntv(base time.Duration) time.Duration {
if c.config.DevMode {
if c.GetConfig().DevMode {
return devModeRetryIntv
}
return base + helper.RandomStagger(base)
@ -1635,7 +1665,7 @@ func (c *Client) registerAndHeartbeat() {
// we want to do this quickly. We want to do it extra quickly
// in development mode.
var heartbeat <-chan time.Time
if c.config.DevMode {
if c.GetConfig().DevMode {
heartbeat = time.After(0)
} else {
heartbeat = time.After(helper.RandomStagger(initialHeartbeatStagger))
@ -1679,7 +1709,7 @@ func (c *Client) lastHeartbeat() time.Time {
// getHeartbeatRetryIntv is used to retrieve the time to wait before attempting
// another heartbeat.
func (c *Client) getHeartbeatRetryIntv(err error) time.Duration {
if c.config.DevMode {
if c.GetConfig().DevMode {
return devModeRetryIntv
}
@ -1865,9 +1895,8 @@ func (c *Client) retryRegisterNode() {
// registerNode is used to register the node or update the registration
func (c *Client) registerNode() error {
node := c.Node()
req := structs.NodeRegisterRequest{
Node: node,
Node: c.Node(),
WriteRequest: structs.WriteRequest{Region: c.Region()},
}
var resp structs.NodeUpdateResponse
@ -1876,10 +1905,9 @@ func (c *Client) registerNode() error {
}
// Update the node status to ready after we register.
c.configLock.Lock()
node.Status = structs.NodeStatusReady
c.config.Node.Status = structs.NodeStatusReady
c.configLock.Unlock()
c.UpdateConfig(func(c *config.Config) {
c.Node.Status = structs.NodeStatusReady
})
c.logger.Info("node registration complete")
if len(resp.EvalIDs) != 0 {
@ -2262,14 +2290,9 @@ OUTER:
}
}
// updateNode updates the Node copy and triggers the client to send the updated
// Node to the server. This should be done while the caller holds the
// configLock lock.
func (c *Client) updateNodeLocked() {
// Update the config copy.
node := c.config.Node.Copy()
c.configCopy.Node = node
// updateNode signals the client to send the updated
// Node to the server.
func (c *Client) updateNode() {
select {
case c.triggerNodeUpdate <- struct{}{}:
// Node update goroutine was released to execute
@ -2499,20 +2522,16 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
PreviousRunner: c.allocs[alloc.PreviousAllocation],
PreemptedRunners: preemptedAllocs,
RPC: c,
Config: c.configCopy,
Config: c.GetConfig(),
MigrateToken: migrateToken,
Logger: c.logger,
}
prevAllocWatcher, prevAllocMigrator := allocwatcher.NewAllocWatcher(watcherConfig)
// Copy the config since the node can be swapped out as it is being updated.
// The long term fix is to pass in the config and node separately and then
// we don't have to do a copy.
c.configLock.RLock()
arConf := &allocrunner.Config{
Alloc: alloc,
Logger: c.logger,
ClientConfig: c.configCopy,
ClientConfig: c.GetConfig(),
StateDB: c.stateDB,
Consul: c.consulService,
ConsulProxies: c.consulProxies,
@ -2532,7 +2551,6 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
RPCClient: c,
Getter: c.getter,
}
c.configLock.RUnlock()
ar, err := allocrunner.NewAllocRunner(arConf)
if err != nil {
@ -2561,7 +2579,7 @@ func (c *Client) setupConsulTokenClient() error {
// with vault.
func (c *Client) setupVaultClient() error {
var err error
c.vaultClient, err = vaultclient.NewVaultClient(c.config.VaultConfig, c.logger, c.deriveToken)
c.vaultClient, err = vaultclient.NewVaultClient(c.GetConfig().VaultConfig, c.logger, c.deriveToken)
if err != nil {
return err
}
@ -2582,7 +2600,7 @@ func (c *Client) setupVaultClient() error {
func (c *Client) setupNomadServiceRegistrationHandler() {
cfg := nsd.ServiceRegistrationHandlerCfg{
Datacenter: c.Datacenter(),
Enabled: c.config.NomadServiceDiscovery,
Enabled: c.GetConfig().NomadServiceDiscovery,
NodeID: c.NodeID(),
NodeSecret: c.secretNodeID(),
Region: c.Region(),
@ -2762,7 +2780,8 @@ func taskIsPresent(taskName string, tasks []*structs.Task) bool {
// triggerDiscovery causes a Consul discovery to begin (if one hasn't already)
func (c *Client) triggerDiscovery() {
if c.configCopy.ConsulConfig.ClientAutoJoin != nil && *c.configCopy.ConsulConfig.ClientAutoJoin {
config := c.GetConfig()
if config.ConsulConfig.ClientAutoJoin != nil && *config.ConsulConfig.ClientAutoJoin {
select {
case c.triggerDiscoveryCh <- struct{}{}:
// Discovery goroutine was released to execute
@ -2815,7 +2834,7 @@ func (c *Client) consulDiscoveryImpl() error {
},
}
serviceName := c.configCopy.ConsulConfig.ServerServiceName
serviceName := c.GetConfig().ConsulConfig.ServerServiceName
var mErr multierror.Error
var nomadServers servers.Servers
consulLogger.Debug("bootstrap contacting Consul DCs", "consul_dcs", dcs)
@ -2909,13 +2928,14 @@ func (c *Client) emitStats() {
next := time.NewTimer(0)
defer next.Stop()
for {
config := c.GetConfig()
select {
case <-next.C:
err := c.hostStatsCollector.Collect()
next.Reset(c.config.StatsCollectionInterval)
next.Reset(config.StatsCollectionInterval)
if err != nil {
c.logger.Warn("error fetching host resource usage stats", "error", err)
} else if c.config.PublishNodeMetrics {
} else if config.PublishNodeMetrics {
// Publish Node metrics if operator has opted in
c.emitHostStats()
}
@ -2976,9 +2996,7 @@ func (c *Client) setGaugeForDiskStats(nodeID string, hStats *stats.HostStats, ba
// setGaugeForAllocationStats proxies metrics for allocation specific statistics
func (c *Client) setGaugeForAllocationStats(nodeID string, baseLabels []metrics.Label) {
c.configLock.RLock()
node := c.configCopy.Node
c.configLock.RUnlock()
node := c.GetConfig().Node
total := node.NodeResources
res := node.ReservedResources
allocated := c.getAllocatedResources(node)
@ -3077,14 +3095,11 @@ func (c *Client) emitClientMetrics() {
// labels takes the base labels and appends the node state
func (c *Client) labels() []metrics.Label {
c.configLock.RLock()
nodeStatus := c.configCopy.Node.Status
nodeEligibility := c.configCopy.Node.SchedulingEligibility
c.configLock.RUnlock()
node := c.Node()
return append(c.baseLabels,
metrics.Label{Name: "node_status", Value: nodeStatus},
metrics.Label{Name: "node_scheduling_eligibility", Value: nodeEligibility},
metrics.Label{Name: "node_status", Value: node.Status},
metrics.Label{Name: "node_scheduling_eligibility", Value: node.SchedulingEligibility},
)
}

View File

@ -191,63 +191,55 @@ func TestClient_Fingerprint_Periodic(t *testing.T) {
})
defer cleanup()
node := c1.config.Node
{
// Ensure the mock driver is registered on the client
testutil.WaitForResult(func() (bool, error) {
c1.configLock.Lock()
defer c1.configLock.Unlock()
// Ensure the mock driver is registered on the client
testutil.WaitForResult(func() (bool, error) {
node := c1.Node()
// assert that the driver is set on the node attributes
mockDriverInfoAttr := node.Attributes["driver.mock_driver"]
if mockDriverInfoAttr == "" {
return false, fmt.Errorf("mock driver is empty when it should be set on the node attributes")
}
// assert that the driver is set on the node attributes
mockDriverInfoAttr := node.Attributes["driver.mock_driver"]
if mockDriverInfoAttr == "" {
return false, fmt.Errorf("mock driver is empty when it should be set on the node attributes")
}
mockDriverInfo := node.Drivers["mock_driver"]
mockDriverInfo := node.Drivers["mock_driver"]
// assert that the Driver information for the node is also set correctly
if mockDriverInfo == nil {
return false, fmt.Errorf("mock driver is nil when it should be set on node Drivers")
}
if !mockDriverInfo.Detected {
return false, fmt.Errorf("mock driver should be set as detected")
}
if !mockDriverInfo.Healthy {
return false, fmt.Errorf("mock driver should be set as healthy")
}
if mockDriverInfo.HealthDescription == "" {
return false, fmt.Errorf("mock driver description should not be empty")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
// assert that the Driver information for the node is also set correctly
if mockDriverInfo == nil {
return false, fmt.Errorf("mock driver is nil when it should be set on node Drivers")
}
if !mockDriverInfo.Detected {
return false, fmt.Errorf("mock driver should be set as detected")
}
if !mockDriverInfo.Healthy {
return false, fmt.Errorf("mock driver should be set as healthy")
}
if mockDriverInfo.HealthDescription == "" {
return false, fmt.Errorf("mock driver description should not be empty")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
{
testutil.WaitForResult(func() (bool, error) {
c1.configLock.Lock()
defer c1.configLock.Unlock()
mockDriverInfo := node.Drivers["mock_driver"]
// assert that the Driver information for the node is also set correctly
if mockDriverInfo == nil {
return false, fmt.Errorf("mock driver is nil when it should be set on node Drivers")
}
if mockDriverInfo.Detected {
return false, fmt.Errorf("mock driver should not be set as detected")
}
if mockDriverInfo.Healthy {
return false, fmt.Errorf("mock driver should not be set as healthy")
}
if mockDriverInfo.HealthDescription == "" {
return false, fmt.Errorf("mock driver description should not be empty")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
testutil.WaitForResult(func() (bool, error) {
mockDriverInfo := c1.Node().Drivers["mock_driver"]
// assert that the Driver information for the node is also set correctly
if mockDriverInfo == nil {
return false, fmt.Errorf("mock driver is nil when it should be set on node Drivers")
}
if mockDriverInfo.Detected {
return false, fmt.Errorf("mock driver should not be set as detected")
}
if mockDriverInfo.Healthy {
return false, fmt.Errorf("mock driver should not be set as healthy")
}
if mockDriverInfo.HealthDescription == "" {
return false, fmt.Errorf("mock driver description should not be empty")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
// TestClient_MixedTLS asserts that when a server is running with TLS enabled
@ -1115,17 +1107,18 @@ func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) {
})
// initial check
conf := client.GetConfig()
expectedResources := &structs.NodeResources{
// computed through test client initialization
Networks: client.configCopy.Node.NodeResources.Networks,
NodeNetworks: client.configCopy.Node.NodeResources.NodeNetworks,
Disk: client.configCopy.Node.NodeResources.Disk,
Networks: conf.Node.NodeResources.Networks,
NodeNetworks: conf.Node.NodeResources.NodeNetworks,
Disk: conf.Node.NodeResources.Disk,
// injected
Cpu: structs.NodeCpuResources{
CpuShares: 123,
ReservableCpuCores: client.configCopy.Node.NodeResources.Cpu.ReservableCpuCores,
TotalCpuCores: client.configCopy.Node.NodeResources.Cpu.TotalCpuCores,
ReservableCpuCores: conf.Node.NodeResources.Cpu.ReservableCpuCores,
TotalCpuCores: conf.Node.NodeResources.Cpu.TotalCpuCores,
},
Memory: structs.NodeMemoryResources{MemoryMB: 1024},
Devices: []*structs.NodeDeviceResource{
@ -1136,7 +1129,7 @@ func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) {
},
}
assert.EqualValues(t, expectedResources, client.configCopy.Node.NodeResources)
assert.EqualValues(t, expectedResources, conf.Node.NodeResources)
// overrides of values
@ -1157,17 +1150,19 @@ func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) {
},
})
conf = client.GetConfig()
expectedResources2 := &structs.NodeResources{
// computed through test client initialization
Networks: client.configCopy.Node.NodeResources.Networks,
NodeNetworks: client.configCopy.Node.NodeResources.NodeNetworks,
Disk: client.configCopy.Node.NodeResources.Disk,
Networks: conf.Node.NodeResources.Networks,
NodeNetworks: conf.Node.NodeResources.NodeNetworks,
Disk: conf.Node.NodeResources.Disk,
// injected
Cpu: structs.NodeCpuResources{
CpuShares: 123,
ReservableCpuCores: client.configCopy.Node.NodeResources.Cpu.ReservableCpuCores,
TotalCpuCores: client.configCopy.Node.NodeResources.Cpu.TotalCpuCores,
ReservableCpuCores: conf.Node.NodeResources.Cpu.ReservableCpuCores,
TotalCpuCores: conf.Node.NodeResources.Cpu.TotalCpuCores,
},
Memory: structs.NodeMemoryResources{MemoryMB: 2048},
Devices: []*structs.NodeDeviceResource{
@ -1182,7 +1177,7 @@ func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) {
},
}
assert.EqualValues(t, expectedResources2, client.configCopy.Node.NodeResources)
assert.EqualValues(t, expectedResources2, conf.Node.NodeResources)
}

View File

@ -13,6 +13,7 @@ import (
"github.com/hashicorp/consul-template/config"
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/nomad/command/agent/host"
"golang.org/x/exp/slices"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/state"
@ -698,8 +699,11 @@ func (rc *RetryConfig) ToConsulTemplate() (*config.RetryConfig, error) {
}
func (c *Config) Copy() *Config {
nc := new(Config)
*nc = *c
if c == nil {
return nil
}
nc := *c
nc.Node = nc.Node.Copy()
nc.Servers = helper.CopySliceString(nc.Servers)
nc.Options = helper.CopyMapStringString(nc.Options)
@ -707,12 +711,9 @@ func (c *Config) Copy() *Config {
nc.ConsulConfig = c.ConsulConfig.Copy()
nc.VaultConfig = c.VaultConfig.Copy()
nc.TemplateConfig = c.TemplateConfig.Copy()
if c.ReservableCores != nil {
nc.ReservableCores = make([]uint16, len(c.ReservableCores))
copy(nc.ReservableCores, c.ReservableCores)
}
nc.ReservableCores = slices.Clone(c.ReservableCores)
nc.Artifact = c.Artifact.Copy()
return nc
return &nc
}
// DefaultConfig returns the default configuration

View File

@ -22,10 +22,11 @@ func TestDriverManager_Fingerprint_Run(t *testing.T) {
testClient, cleanup := TestClient(t, nil)
defer cleanup()
conf := testClient.GetConfig()
dm := drivermanager.New(&drivermanager.Config{
Logger: testClient.logger,
Loader: testClient.config.PluginSingletonLoader,
PluginConfig: testClient.configCopy.NomadPluginConfig(),
Loader: conf.PluginSingletonLoader,
PluginConfig: conf.NomadPluginConfig(),
Updater: testClient.updateNodeFromDriver,
EventHandlerFactory: testClient.GetTaskEventHandler,
State: testClient.stateDB,
@ -35,7 +36,7 @@ func TestDriverManager_Fingerprint_Run(t *testing.T) {
defer dm.Shutdown()
testutil.WaitForResult(func() (bool, error) {
node := testClient.configCopy.Node
node := testClient.Node()
d, ok := node.Drivers["mock_driver"]
if !ok {
@ -73,10 +74,11 @@ func TestDriverManager_Fingerprint_Periodic(t *testing.T) {
})
defer cleanup()
conf := testClient.GetConfig()
dm := drivermanager.New(&drivermanager.Config{
Logger: testClient.logger,
Loader: testClient.config.PluginSingletonLoader,
PluginConfig: testClient.configCopy.NomadPluginConfig(),
Loader: conf.PluginSingletonLoader,
PluginConfig: conf.NomadPluginConfig(),
Updater: testClient.updateNodeFromDriver,
EventHandlerFactory: testClient.GetTaskEventHandler,
State: testClient.stateDB,
@ -134,10 +136,11 @@ func TestDriverManager_NodeAttributes_Run(t *testing.T) {
})
defer cleanup()
conf := testClient.GetConfig()
dm := drivermanager.New(&drivermanager.Config{
Logger: testClient.logger,
Loader: testClient.config.PluginSingletonLoader,
PluginConfig: testClient.configCopy.NomadPluginConfig(),
Loader: conf.PluginSingletonLoader,
PluginConfig: conf.NomadPluginConfig(),
Updater: testClient.updateNodeFromDriver,
EventHandlerFactory: testClient.GetTaskEventHandler,
State: testClient.stateDB,

View File

@ -41,18 +41,20 @@ SEND_BATCH:
c.configLock.Lock()
defer c.configLock.Unlock()
newConfig := c.config.Copy()
// csi updates
var csiChanged bool
c.batchNodeUpdates.batchCSIUpdates(func(name string, info *structs.CSIInfo) {
if c.updateNodeFromCSIControllerLocked(name, info) {
if c.config.Node.CSIControllerPlugins[name].UpdateTime.IsZero() {
c.config.Node.CSIControllerPlugins[name].UpdateTime = time.Now()
if c.updateNodeFromCSIControllerLocked(name, info, newConfig.Node) {
if newConfig.Node.CSIControllerPlugins[name].UpdateTime.IsZero() {
newConfig.Node.CSIControllerPlugins[name].UpdateTime = time.Now()
}
csiChanged = true
}
if c.updateNodeFromCSINodeLocked(name, info) {
if c.config.Node.CSINodePlugins[name].UpdateTime.IsZero() {
c.config.Node.CSINodePlugins[name].UpdateTime = time.Now()
if c.updateNodeFromCSINodeLocked(name, info, newConfig.Node) {
if newConfig.Node.CSINodePlugins[name].UpdateTime.IsZero() {
newConfig.Node.CSINodePlugins[name].UpdateTime = time.Now()
}
csiChanged = true
}
@ -61,10 +63,10 @@ SEND_BATCH:
// driver node updates
var driverChanged bool
c.batchNodeUpdates.batchDriverUpdates(func(driver string, info *structs.DriverInfo) {
if c.updateNodeFromDriverLocked(driver, info) {
c.config.Node.Drivers[driver] = info
if c.config.Node.Drivers[driver].UpdateTime.IsZero() {
c.config.Node.Drivers[driver].UpdateTime = time.Now()
if c.applyNodeUpdatesFromDriver(driver, info, newConfig.Node) {
newConfig.Node.Drivers[driver] = info
if newConfig.Node.Drivers[driver].UpdateTime.IsZero() {
newConfig.Node.Drivers[driver].UpdateTime = time.Now()
}
driverChanged = true
}
@ -80,7 +82,8 @@ SEND_BATCH:
// only update the node if changes occurred
if driverChanged || devicesChanged || csiChanged {
c.updateNodeLocked()
c.config = newConfig
c.updateNode()
}
close(c.fpInitialized)
@ -92,24 +95,27 @@ func (c *Client) updateNodeFromCSI(name string, info *structs.CSIInfo) {
c.configLock.Lock()
defer c.configLock.Unlock()
newConfig := c.config.Copy()
changed := false
if c.updateNodeFromCSIControllerLocked(name, info) {
if c.config.Node.CSIControllerPlugins[name].UpdateTime.IsZero() {
c.config.Node.CSIControllerPlugins[name].UpdateTime = time.Now()
if c.updateNodeFromCSIControllerLocked(name, info, newConfig.Node) {
if newConfig.Node.CSIControllerPlugins[name].UpdateTime.IsZero() {
newConfig.Node.CSIControllerPlugins[name].UpdateTime = time.Now()
}
changed = true
}
if c.updateNodeFromCSINodeLocked(name, info) {
if c.config.Node.CSINodePlugins[name].UpdateTime.IsZero() {
c.config.Node.CSINodePlugins[name].UpdateTime = time.Now()
if c.updateNodeFromCSINodeLocked(name, info, newConfig.Node) {
if newConfig.Node.CSINodePlugins[name].UpdateTime.IsZero() {
newConfig.Node.CSINodePlugins[name].UpdateTime = time.Now()
}
changed = true
}
if changed {
c.updateNodeLocked()
c.config = newConfig
c.updateNode()
}
}
@ -119,7 +125,7 @@ func (c *Client) updateNodeFromCSI(name string, info *structs.CSIInfo) {
//
// It is safe to call for all CSI Updates, but will only perform changes when
// a ControllerInfo field is present.
func (c *Client) updateNodeFromCSIControllerLocked(name string, info *structs.CSIInfo) bool {
func (c *Client) updateNodeFromCSIControllerLocked(name string, info *structs.CSIInfo, node *structs.Node) bool {
var changed bool
if info.ControllerInfo == nil {
return false
@ -127,15 +133,15 @@ func (c *Client) updateNodeFromCSIControllerLocked(name string, info *structs.CS
i := info.Copy()
i.NodeInfo = nil
oldController, hadController := c.config.Node.CSIControllerPlugins[name]
oldController, hadController := node.CSIControllerPlugins[name]
if !hadController {
// If the controller info has not yet been set, do that here
changed = true
c.config.Node.CSIControllerPlugins[name] = i
node.CSIControllerPlugins[name] = i
} else {
// The controller info has already been set, fix it up
if !oldController.Equal(i) {
c.config.Node.CSIControllerPlugins[name] = i
node.CSIControllerPlugins[name] = i
changed = true
}
@ -162,7 +168,7 @@ func (c *Client) updateNodeFromCSIControllerLocked(name string, info *structs.CS
//
// It is safe to call for all CSI Updates, but will only perform changes when
// a NodeInfo field is present.
func (c *Client) updateNodeFromCSINodeLocked(name string, info *structs.CSIInfo) bool {
func (c *Client) updateNodeFromCSINodeLocked(name string, info *structs.CSIInfo, node *structs.Node) bool {
var changed bool
if info.NodeInfo == nil {
return false
@ -170,15 +176,15 @@ func (c *Client) updateNodeFromCSINodeLocked(name string, info *structs.CSIInfo)
i := info.Copy()
i.ControllerInfo = nil
oldNode, hadNode := c.config.Node.CSINodePlugins[name]
oldNode, hadNode := node.CSINodePlugins[name]
if !hadNode {
// If the Node info has not yet been set, do that here
changed = true
c.config.Node.CSINodePlugins[name] = i
node.CSINodePlugins[name] = i
} else {
// The node info has already been set, fix it up
if !oldNode.Equal(info) {
c.config.Node.CSINodePlugins[name] = i
node.CSINodePlugins[name] = i
changed = true
}
@ -205,30 +211,33 @@ func (c *Client) updateNodeFromDriver(name string, info *structs.DriverInfo) {
c.configLock.Lock()
defer c.configLock.Unlock()
if c.updateNodeFromDriverLocked(name, info) {
c.config.Node.Drivers[name] = info
if c.config.Node.Drivers[name].UpdateTime.IsZero() {
c.config.Node.Drivers[name].UpdateTime = time.Now()
newConfig := c.config.Copy()
if c.applyNodeUpdatesFromDriver(name, info, newConfig.Node) {
newConfig.Node.Drivers[name] = info
if newConfig.Node.Drivers[name].UpdateTime.IsZero() {
newConfig.Node.Drivers[name].UpdateTime = time.Now()
}
c.updateNodeLocked()
c.config = newConfig
c.updateNode()
}
}
// updateNodeFromDriverLocked makes the changes to the node from a driver update
// but does not send the update to the server. c.configLock must be held before
// calling this func
func (c *Client) updateNodeFromDriverLocked(name string, info *structs.DriverInfo) bool {
// applyNodeUpdatesFromDriver applies changes to the passed in node. true is
// returned if the node has changed.
func (c *Client) applyNodeUpdatesFromDriver(name string, info *structs.DriverInfo, node *structs.Node) bool {
var hasChanged bool
hadDriver := c.config.Node.Drivers[name] != nil
hadDriver := node.Drivers[name] != nil
if !hadDriver {
// If the driver info has not yet been set, do that here
hasChanged = true
for attrName, newVal := range info.Attributes {
c.config.Node.Attributes[attrName] = newVal
node.Attributes[attrName] = newVal
}
} else {
oldVal := c.config.Node.Drivers[name]
oldVal := node.Drivers[name]
// The driver info has already been set, fix it up
if oldVal.Detected != info.Detected {
hasChanged = true
@ -247,16 +256,16 @@ func (c *Client) updateNodeFromDriverLocked(name string, info *structs.DriverInf
}
for attrName, newVal := range info.Attributes {
oldVal := c.config.Node.Drivers[name].Attributes[attrName]
oldVal := node.Drivers[name].Attributes[attrName]
if oldVal == newVal {
continue
}
hasChanged = true
if newVal == "" {
delete(c.config.Node.Attributes, attrName)
delete(node.Attributes, attrName)
} else {
c.config.Node.Attributes[attrName] = newVal
node.Attributes[attrName] = newVal
}
}
}
@ -266,16 +275,14 @@ func (c *Client) updateNodeFromDriverLocked(name string, info *structs.DriverInf
// their attributes as DriverInfo
driverName := fmt.Sprintf("driver.%s", name)
if info.Detected {
c.config.Node.Attributes[driverName] = "1"
node.Attributes[driverName] = "1"
} else {
delete(c.config.Node.Attributes, driverName)
delete(node.Attributes, driverName)
}
return hasChanged
}
// updateNodeFromFingerprint updates the node with the result of
// fingerprinting the node from the diff that was created
func (c *Client) updateNodeFromDevices(devices []*structs.NodeDeviceResource) {
c.configLock.Lock()
defer c.configLock.Unlock()
@ -284,7 +291,7 @@ func (c *Client) updateNodeFromDevices(devices []*structs.NodeDeviceResource) {
// dispatched task resources and not appropriate for expressing
// node available device resources
if c.updateNodeFromDevicesLocked(devices) {
c.updateNodeLocked()
c.updateNode()
}
}
@ -294,7 +301,9 @@ func (c *Client) updateNodeFromDevices(devices []*structs.NodeDeviceResource) {
func (c *Client) updateNodeFromDevicesLocked(devices []*structs.NodeDeviceResource) bool {
if !structs.DevicesEquals(c.config.Node.NodeResources.Devices, devices) {
c.logger.Debug("new devices detected", "devices", len(devices))
c.config.Node.NodeResources.Devices = devices
newConfig := c.config.Copy()
newConfig.Node.NodeResources.Devices = devices
c.config = newConfig
return true
}

View File

@ -47,9 +47,11 @@ func (c *Client) StreamingRpcHandler(method string) (structs.StreamingRpcHandler
// RPC is used to forward an RPC call to a nomad server, or fail if no servers.
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
conf := c.GetConfig()
// Invoke the RPCHandler if it exists
if c.config.RPCHandler != nil {
return c.config.RPCHandler.RPC(method, args, reply)
if conf.RPCHandler != nil {
return conf.RPCHandler.RPC(method, args, reply)
}
// We will try to automatically retry requests that fail due to things like server unavailability
@ -60,7 +62,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// to the leader they may also allow for an RPCHoldTimeout while waiting for leader election.
// That's OK, we won't double up because we are using it here not as a sleep but
// as a hint to give up
deadline = deadline.Add(c.config.RPCHoldTimeout)
deadline = deadline.Add(conf.RPCHoldTimeout)
// If its a blocking query, allow the time specified by the request
if info, ok := args.(structs.RPCInfo); ok {
@ -109,7 +111,7 @@ TRY:
}
// Wait to avoid thundering herd
timer, cancel := helper.NewSafeTimer(helper.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction))
timer, cancel := helper.NewSafeTimer(helper.RandomStagger(conf.RPCHoldTimeout / structs.JitterFraction))
defer cancel()
select {

View File

@ -94,7 +94,6 @@ func TestRPCOnlyClient(t testing.T, srvAddr net.Addr, rpcs map[string]interface{
client := &Client{config: conf, logger: testlog.HCLogger(t)}
client.servers = servers.New(client.logger, client.shutdownCh, client)
client.configCopy = client.config.Copy()
client.rpcServer = rpc.NewServer()
for name, rpc := range rpcs {

View File

@ -1150,15 +1150,17 @@ func (a *Agent) Reload(newConfig *Config) error {
a.configLock.Lock()
defer a.configLock.Unlock()
updatedLogging := newConfig != nil && (newConfig.LogLevel != a.config.LogLevel)
current := a.config.Copy()
updatedLogging := newConfig != nil && (newConfig.LogLevel != current.LogLevel)
if newConfig == nil || newConfig.TLSConfig == nil && !updatedLogging {
return fmt.Errorf("cannot reload agent with nil configuration")
}
if updatedLogging {
a.config.LogLevel = newConfig.LogLevel
a.logger.SetLevel(log.LevelFromString(newConfig.LogLevel))
current.LogLevel = newConfig.LogLevel
a.logger.SetLevel(log.LevelFromString(current.LogLevel))
}
// Update eventer config
@ -1178,10 +1180,10 @@ func (a *Agent) Reload(newConfig *Config) error {
// Completely reload the agent's TLS configuration (moving from non-TLS to
// TLS, or vice versa)
// This does not handle errors in loading the new TLS configuration
a.config.TLSConfig = newConfig.TLSConfig.Copy()
current.TLSConfig = newConfig.TLSConfig.Copy()
}
if !a.config.TLSConfig.IsEmpty() && !newConfig.TLSConfig.IsEmpty() {
if !current.TLSConfig.IsEmpty() && !newConfig.TLSConfig.IsEmpty() {
// This is just a TLS configuration reload, we don't need to refresh
// existing network connections
@ -1190,26 +1192,31 @@ func (a *Agent) Reload(newConfig *Config) error {
// as this allows us to dynamically reload configurations not only
// on the Agent but on the Server and Client too (they are
// referencing the same keyloader).
keyloader := a.config.TLSConfig.GetKeyLoader()
keyloader := current.TLSConfig.GetKeyLoader()
_, err := keyloader.LoadKeyPair(newConfig.TLSConfig.CertFile, newConfig.TLSConfig.KeyFile)
if err != nil {
return err
}
a.config.TLSConfig = newConfig.TLSConfig
a.config.TLSConfig.KeyLoader = keyloader
current.TLSConfig = newConfig.TLSConfig
current.TLSConfig.KeyLoader = keyloader
a.config = current
return nil
} else if newConfig.TLSConfig.IsEmpty() && !a.config.TLSConfig.IsEmpty() {
} else if newConfig.TLSConfig.IsEmpty() && !current.TLSConfig.IsEmpty() {
a.logger.Warn("downgrading agent's existing TLS configuration to plaintext")
fullUpdateTLSConfig()
} else if !newConfig.TLSConfig.IsEmpty() && a.config.TLSConfig.IsEmpty() {
} else if !newConfig.TLSConfig.IsEmpty() && current.TLSConfig.IsEmpty() {
a.logger.Info("upgrading from plaintext configuration to TLS")
fullUpdateTLSConfig()
}
// Set agent config to the updated config
a.config = current
return nil
}
// GetConfig creates a locked reference to the agent's config
// GetConfig returns the current agent configuration. The Config should *not*
// be mutated directly. First call Config.Copy.
func (a *Agent) GetConfig() *Config {
a.configLock.Lock()
defer a.configLock.Unlock()

View File

@ -23,7 +23,6 @@ import (
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/serf/serf"
"github.com/mitchellh/copystructure"
)
type Member struct {
@ -81,11 +80,8 @@ func (s *HTTPServer) AgentSelfRequest(resp http.ResponseWriter, req *http.Reques
Member: nomadMember(member),
Stats: s.agent.Stats(),
}
if ac, err := copystructure.Copy(s.agent.config); err != nil {
return nil, CodedError(500, err.Error())
} else {
self.Config = ac.(*Config)
}
self.Config = s.agent.GetConfig().Copy()
if self.Config != nil && self.Config.Vault != nil && self.Config.Vault.Token != "" {
self.Config.Vault.Token = "<redacted>"

View File

@ -844,9 +844,9 @@ func TestServer_Reload_TLS_Shared_Keyloader(t *testing.T) {
}
assert.Nil(agent.Reload(newConfig))
assert.Equal(agent.Config.TLSConfig.CertFile, newConfig.TLSConfig.CertFile)
assert.Equal(agent.Config.TLSConfig.KeyFile, newConfig.TLSConfig.KeyFile)
assert.Equal(agent.Config.TLSConfig.GetKeyLoader(), originalKeyloader)
assert.Equal(agent.Agent.config.TLSConfig.CertFile, newConfig.TLSConfig.CertFile)
assert.Equal(agent.Agent.config.TLSConfig.KeyFile, newConfig.TLSConfig.KeyFile)
assert.Equal(agent.Agent.config.TLSConfig.GetKeyLoader(), originalKeyloader)
// Assert is passed through on the server correctly
if assert.NotNil(agent.server.GetConfig().TLSConfig) {
@ -1082,7 +1082,7 @@ func TestServer_Reload_TLS_DowngradeFromTLS(t *testing.T) {
err := agent.Reload(newConfig)
assert.Nil(err)
assert.True(agentConfig.TLSConfig.IsEmpty())
assert.True(agent.config.TLSConfig.IsEmpty())
}
func TestServer_ShouldReload_ReturnFalseForNoChanges(t *testing.T) {

View File

@ -1008,7 +1008,7 @@ func (c *Command) handleReload() {
}
}
if s := c.agent.Client(); s != nil {
if client := c.agent.Client(); client != nil {
c.agent.logger.Debug("starting reload of client config")
clientConfig, err := convertClientConfig(newConf)
if err != nil {
@ -1022,7 +1022,7 @@ func (c *Command) handleReload() {
return
}
if err := c.agent.Client().Reload(clientConfig); err != nil {
if err := client.Reload(clientConfig); err != nil {
c.agent.logger.Error("reloading client config failed", "error", err)
return
}

View File

@ -21,11 +21,13 @@ import (
"github.com/hashicorp/go-sockaddr/template"
client "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/version"
"golang.org/x/exp/slices"
)
// Config is the configuration for the Nomad agent.
@ -332,6 +334,28 @@ type ClientConfig struct {
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
func (c *ClientConfig) Copy() *ClientConfig {
if c == nil {
return c
}
nc := *c
nc.Servers = slices.Clone(c.Servers)
nc.Options = helper.CopyMap(c.Options)
nc.Meta = helper.CopyMap(c.Meta)
nc.ChrootEnv = helper.CopyMap(c.ChrootEnv)
nc.Reserved = c.Reserved.Copy()
nc.NoHostUUID = pointer.Copy(c.NoHostUUID)
nc.TemplateConfig = c.TemplateConfig.Copy()
nc.ServerJoin = c.ServerJoin.Copy()
nc.HostVolumes = helper.CopySlice(c.HostVolumes)
nc.HostNetworks = helper.CopySlice(c.HostNetworks)
nc.NomadServiceDiscovery = pointer.Copy(c.NomadServiceDiscovery)
nc.Artifact = c.Artifact.Copy()
nc.ExtraKeysHCL = slices.Clone(c.ExtraKeysHCL)
return &nc
}
// ACLConfig is configuration specific to the ACL system
type ACLConfig struct {
// Enabled controls if we are enforce and manage ACLs
@ -360,6 +384,16 @@ type ACLConfig struct {
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
func (a *ACLConfig) Copy() *ACLConfig {
if a == nil {
return nil
}
na := *a
na.ExtraKeysHCL = slices.Clone(a.ExtraKeysHCL)
return &na
}
// ServerConfig is configuration specific to the server mode
type ServerConfig struct {
// Enabled controls if we are a server
@ -555,6 +589,29 @@ type ServerConfig struct {
RaftBoltConfig *RaftBoltConfig `hcl:"raft_boltdb"`
}
func (s *ServerConfig) Copy() *ServerConfig {
if s == nil {
return nil
}
ns := *s
ns.RaftMultiplier = pointer.Copy(s.RaftMultiplier)
ns.NumSchedulers = pointer.Copy(s.NumSchedulers)
ns.EnabledSchedulers = slices.Clone(s.EnabledSchedulers)
ns.StartJoin = slices.Clone(s.StartJoin)
ns.RetryJoin = slices.Clone(s.RetryJoin)
ns.ServerJoin = s.ServerJoin.Copy()
ns.DefaultSchedulerConfig = s.DefaultSchedulerConfig.Copy()
ns.PlanRejectionTracker = s.PlanRejectionTracker.Copy()
ns.EnableEventBroker = pointer.Copy(s.EnableEventBroker)
ns.EventBufferSize = pointer.Copy(s.EventBufferSize)
ns.licenseAdditionalPublicKeys = slices.Clone(s.licenseAdditionalPublicKeys)
ns.ExtraKeysHCL = slices.Clone(s.ExtraKeysHCL)
ns.Search = s.Search.Copy()
ns.RaftBoltConfig = s.RaftBoltConfig.Copy()
return &ns
}
// RaftBoltConfig is used in servers to configure parameters of the boltdb
// used for raft consensus.
type RaftBoltConfig struct {
@ -566,6 +623,15 @@ type RaftBoltConfig struct {
NoFreelistSync bool `hcl:"no_freelist_sync"`
}
func (r *RaftBoltConfig) Copy() *RaftBoltConfig {
if r == nil {
return nil
}
nr := *r
return &nr
}
// PlanRejectionTracker is used in servers to configure the plan rejection
// tracker.
type PlanRejectionTracker struct {
@ -585,6 +651,17 @@ type PlanRejectionTracker struct {
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
func (p *PlanRejectionTracker) Copy() *PlanRejectionTracker {
if p == nil {
return nil
}
np := *p
np.Enabled = pointer.Copy(p.Enabled)
np.ExtraKeysHCL = slices.Clone(p.ExtraKeysHCL)
return &np
}
func (p *PlanRejectionTracker) Merge(b *PlanRejectionTracker) *PlanRejectionTracker {
if p == nil {
return b
@ -649,6 +726,15 @@ type Search struct {
MinTermLength int `hcl:"min_term_length"`
}
func (s *Search) Copy() *Search {
if s == nil {
return nil
}
ns := *s
return &ns
}
// ServerJoin is used in both clients and servers to bootstrap connections to
// servers
type ServerJoin struct {
@ -676,6 +762,18 @@ type ServerJoin struct {
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
func (s *ServerJoin) Copy() *ServerJoin {
if s == nil {
return nil
}
ns := *s
ns.StartJoin = slices.Clone(s.StartJoin)
ns.RetryJoin = slices.Clone(s.RetryJoin)
ns.ExtraKeysHCL = slices.Clone(s.ExtraKeysHCL)
return &ns
}
func (s *ServerJoin) Merge(b *ServerJoin) *ServerJoin {
if s == nil {
return b
@ -810,6 +908,19 @@ type Telemetry struct {
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
func (t *Telemetry) Copy() *Telemetry {
if t == nil {
return nil
}
nt := *t
nt.DataDogTags = slices.Clone(t.DataDogTags)
nt.PrefixFilter = slices.Clone(t.PrefixFilter)
nt.FilterDefault = pointer.Copy(t.FilterDefault)
nt.ExtraKeysHCL = slices.Clone(t.ExtraKeysHCL)
return &nt
}
// PrefixFilters parses the PrefixFilter field and returns a list of allowed and blocked filters
func (a *Telemetry) PrefixFilters() (allowed, blocked []string, err error) {
for _, rule := range a.PrefixFilter {
@ -838,6 +949,16 @@ type Ports struct {
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
func (p *Ports) Copy() *Ports {
if p == nil {
return nil
}
np := *p
np.ExtraKeysHCL = slices.Clone(p.ExtraKeysHCL)
return &np
}
// Addresses encapsulates all of the addresses we bind to for various
// network services. Everything is optional and defaults to BindAddr.
type Addresses struct {
@ -848,6 +969,16 @@ type Addresses struct {
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
func (a *Addresses) Copy() *Addresses {
if a == nil {
return nil
}
na := *a
na.ExtraKeysHCL = slices.Clone(a.ExtraKeysHCL)
return &na
}
// AdvertiseAddrs is used to control the addresses we advertise out for
// different network services. All are optional and default to BindAddr and
// their default Port.
@ -857,6 +988,16 @@ type NormalizedAddrs struct {
Serf string
}
func (n *NormalizedAddrs) Copy() *NormalizedAddrs {
if n == nil {
return nil
}
nn := *n
nn.HTTP = slices.Clone(n.HTTP)
return &nn
}
// AdvertiseAddrs is used to control the addresses we advertise out for
// different network services. All are optional and default to BindAddr and
// their default Port.
@ -868,6 +1009,16 @@ type AdvertiseAddrs struct {
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
func (a *AdvertiseAddrs) Copy() *AdvertiseAddrs {
if a == nil {
return nil
}
na := *a
na.ExtraKeysHCL = slices.Clone(a.ExtraKeysHCL)
return &na
}
type Resources struct {
CPU int `hcl:"cpu"`
MemoryMB int `hcl:"memory"`
@ -878,6 +1029,16 @@ type Resources struct {
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
func (r *Resources) Copy() *Resources {
if r == nil {
return nil
}
nr := *r
nr.ExtraKeysHCL = slices.Clone(r.ExtraKeysHCL)
return &nr
}
// devModeConfig holds the config for the -dev and -dev-connect flags
type devModeConfig struct {
// mode flags are set at the command line via -dev and -dev-connect
@ -1310,6 +1471,42 @@ func (c *Config) Merge(b *Config) *Config {
return &result
}
// Copy returns a deep copy safe for mutation.
func (c *Config) Copy() *Config {
if c == nil {
return nil
}
nc := *c
nc.Ports = c.Ports.Copy()
nc.Addresses = c.Addresses.Copy()
nc.normalizedAddrs = c.normalizedAddrs.Copy()
nc.AdvertiseAddrs = c.AdvertiseAddrs.Copy()
nc.Client = c.Client.Copy()
nc.Server = c.Server.Copy()
nc.ACL = c.ACL.Copy()
nc.Telemetry = c.Telemetry.Copy()
nc.DisableUpdateCheck = pointer.Copy(c.DisableUpdateCheck)
nc.Consul = c.Consul.Copy()
nc.Vault = c.Vault.Copy()
nc.UI = c.UI.Copy()
nc.NomadConfig = c.NomadConfig.Copy()
nc.ClientConfig = c.ClientConfig.Copy()
nc.Version = c.Version.Copy()
nc.Files = slices.Clone(c.Files)
nc.TLSConfig = c.TLSConfig.Copy()
nc.HTTPAPIResponseHeaders = helper.CopyMap(c.HTTPAPIResponseHeaders)
nc.Sentinel = c.Sentinel.Copy()
nc.Autopilot = c.Autopilot.Copy()
nc.Plugins = helper.CopySlice(c.Plugins)
nc.Limits = c.Limits.Copy()
nc.Audit = c.Audit.Copy()
nc.ExtraKeysHCL = slices.Clone(c.ExtraKeysHCL)
return &nc
}
// normalizeAddrs normalizes Addresses and AdvertiseAddrs to always be
// initialized and have reasonable defaults.
func (c *Config) normalizeAddrs() error {

View File

@ -935,7 +935,7 @@ func TestHTTP_VerifyHTTPSClient_AfterConfigReload(t *testing.T) {
assert.Nil(err)
resp, err := client.Do(req)
if assert.Nil(err) {
if assert.NoError(err) {
resp.Body.Close()
assert.Equal(resp.StatusCode, 200)
}

View File

@ -79,7 +79,6 @@ func TestHTTP_PrefixJobsList(t *testing.T) {
"aabbbbbb-e8f7-fd38-c855-ab94ceb89706",
"aabbcccc-e8f7-fd38-c855-ab94ceb89706",
}
ci.Parallel(t)
httpTest(t, nil, func(s *TestAgent) {
for i := 0; i < 3; i++ {
// Create the job
@ -3720,7 +3719,7 @@ func TestConversion_apiConnectSidecarServiceProxyToStructs(t *testing.T) {
require.Equal(t, &structs.ConsulProxy{
LocalServiceAddress: "192.168.30.1",
LocalServicePort: 9000,
Config: nil,
Config: map[string]any{},
Upstreams: []structs.ConsulUpstream{{
DestinationName: "upstream",
}},

View File

@ -33,6 +33,10 @@ var invalidFilenameNonASCII = regexp.MustCompile(`[[:^ascii:]/\\<>:"|?*]`)
// invalidFilenameStrict = invalidFilename plus additional punctuation
var invalidFilenameStrict = regexp.MustCompile(`[/\\<>:"|?*$()+=[\];#@~,&']`)
type Copyable[T any] interface {
Copy() T
}
// IsUUID returns true if the given string is a valid UUID.
func IsUUID(str string) bool {
const uuidLen = 36
@ -262,9 +266,9 @@ func CompareMapStringString(a, b map[string]string) bool {
// CopyMap creates a copy of m. Struct values are not deep copies.
//
// If m is nil or contains no elements, the return value is nil.
// If m is nil the return value is nil.
func CopyMap[M ~map[K]V, K comparable, V any](m M) M {
if len(m) == 0 {
if m == nil {
return nil
}
@ -275,16 +279,44 @@ func CopyMap[M ~map[K]V, K comparable, V any](m M) M {
return result
}
// DeepCopyMap creates a copy of m by calling Copy() on each value.
//
// If m is nil the return value is nil.
func DeepCopyMap[M ~map[K]V, K comparable, V Copyable[V]](m M) M {
if m == nil {
return nil
}
result := make(M, len(m))
for k, v := range m {
result[k] = v.Copy()
}
return result
}
// CopySlice creates a deep copy of s. For slices with elements that do not
// implement Copy(), use slices.Clone.
func CopySlice[S ~[]E, E Copyable[E]](s S) S {
if s == nil {
return nil
}
result := make(S, len(s))
for i, v := range s {
result[i] = v.Copy()
}
return result
}
// CopyMapStringString creates a copy of m.
//
// Deprecated; use CopyMap instead.
func CopyMapStringString(m map[string]string) map[string]string {
l := len(m)
if l == 0 {
if m == nil {
return nil
}
c := make(map[string]string, l)
c := make(map[string]string, len(m))
for k, v := range m {
c[k] = v
}
@ -295,12 +327,11 @@ func CopyMapStringString(m map[string]string) map[string]string {
//
// Deprecated; use CopyMap instead.
func CopyMapStringStruct(m map[string]struct{}) map[string]struct{} {
l := len(m)
if l == 0 {
if m == nil {
return nil
}
c := make(map[string]struct{}, l)
c := make(map[string]struct{}, len(m))
for k := range m {
c[k] = struct{}{}
}
@ -311,12 +342,11 @@ func CopyMapStringStruct(m map[string]struct{}) map[string]struct{} {
//
// Deprecated; use CopyMap instead.
func CopyMapStringInterface(m map[string]interface{}) map[string]interface{} {
l := len(m)
if l == 0 {
if m == nil {
return nil
}
c := make(map[string]interface{}, l)
c := make(map[string]interface{}, len(m))
for k, v := range m {
c[k] = v
}

View File

@ -69,7 +69,7 @@ func Test_CopyMap(t *testing.T) {
t.Run("empty", func(t *testing.T) {
m := make(map[string]int, 10)
result := CopyMap(m)
must.Nil(t, result)
must.MapEq(t, map[string]int{}, result)
})
t.Run("elements", func(t *testing.T) {

View File

@ -5,3 +5,12 @@ package pointer
func Of[A any](a A) *A {
return &a
}
// Copy returns a new pointer to a.
func Copy[A any](a *A) *A {
if a == nil {
return nil
}
na := *a
return &na
}

View File

@ -541,8 +541,9 @@ func setupLocal(t *testing.T) (rpc.ClientCodec, func()) {
require.NoError(t, err, "could not setup test client")
}
node1 := c1.Node()
node1.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early versions
node1 := c1.UpdateConfig(func(c *config.Config) {
c.Node.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early versions
}).Node
req := &structs.NodeRegisterRequest{
Node: node1,
@ -568,7 +569,9 @@ func setupLocal(t *testing.T) (rpc.ClientCodec, func()) {
}
// update w/ plugin
node1.CSIControllerPlugins = plugins
node1 = c1.UpdateConfig(func(c *config.Config) {
c.Node.CSIControllerPlugins = plugins
}).Node
s1.fsm.state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1)
cleanup := func() {

View File

@ -8,9 +8,11 @@ import (
"time"
log "github.com/hashicorp/go-hclog"
"golang.org/x/exp/slices"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/nomad/helper/pluginutils/loader"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/deploymentwatcher"
"github.com/hashicorp/nomad/nomad/structs"
@ -373,6 +375,36 @@ type Config struct {
DeploymentQueryRateLimit float64
}
func (c *Config) Copy() *Config {
if c == nil {
return nil
}
nc := *c
// Can't deep copy interfaces
// LogOutput io.Writer
// Logger log.InterceptLogger
// PluginLoader loader.PluginCatalog
// PluginSingletonLoader loader.PluginCatalog
nc.RPCAddr = pointer.Copy(c.RPCAddr)
nc.ClientRPCAdvertise = pointer.Copy(c.ClientRPCAdvertise)
nc.ServerRPCAdvertise = pointer.Copy(c.ServerRPCAdvertise)
nc.RaftConfig = pointer.Copy(c.RaftConfig)
nc.SerfConfig = pointer.Copy(c.SerfConfig)
nc.EnabledSchedulers = slices.Clone(c.EnabledSchedulers)
nc.ConsulConfig = c.ConsulConfig.Copy()
nc.VaultConfig = c.VaultConfig.Copy()
nc.TLSConfig = c.TLSConfig.Copy()
nc.SentinelConfig = c.SentinelConfig.Copy()
nc.AutopilotConfig = c.AutopilotConfig.Copy()
nc.LicenseConfig = c.LicenseConfig.Copy()
nc.SearchConfig = c.SearchConfig.Copy()
return &nc
}
// DefaultConfig returns the default configuration. Only used as the basis for
// merging agent or test parameters.
func DefaultConfig() *Config {

View File

@ -993,8 +993,10 @@ func TestCSIVolumeEndpoint_Create(t *testing.T) {
)
defer cleanup()
node := client.Node()
node.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early versions
node := client.UpdateConfig(func(c *cconfig.Config) {
// client RPCs not supported on early versions
c.Node.Attributes["nomad.version"] = "0.11.0"
}).Node
req0 := &structs.NodeRegisterRequest{
Node: node,
@ -1017,24 +1019,26 @@ func TestCSIVolumeEndpoint_Create(t *testing.T) {
codec := rpcClient(t, srv)
index := uint64(1000)
node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsAttachDetach: true,
SupportsCreateDelete: true,
node = client.UpdateConfig(func(c *cconfig.Config) {
c.Node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsAttachDetach: true,
SupportsCreateDelete: true,
},
RequiresControllerPlugin: true,
},
RequiresControllerPlugin: true,
},
}
node.CSINodePlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
}
c.Node.CSINodePlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
}).Node
index++
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, index, node))
@ -1129,8 +1133,10 @@ func TestCSIVolumeEndpoint_Delete(t *testing.T) {
)
defer cleanup()
node := client.Node()
node.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early versions
node := client.UpdateConfig(func(c *cconfig.Config) {
// client RPCs not supported on early versions
c.Node.Attributes["nomad.version"] = "0.11.0"
}).Node
req0 := &structs.NodeRegisterRequest{
Node: node,
@ -1153,23 +1159,25 @@ func TestCSIVolumeEndpoint_Delete(t *testing.T) {
codec := rpcClient(t, srv)
index := uint64(1000)
node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsAttachDetach: true,
node = client.UpdateConfig(func(c *cconfig.Config) {
c.Node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsAttachDetach: true,
},
RequiresControllerPlugin: true,
},
RequiresControllerPlugin: true,
},
}
node.CSINodePlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
}
c.Node.CSINodePlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
}).Node
index++
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, index, node))
@ -1266,8 +1274,10 @@ func TestCSIVolumeEndpoint_ListExternal(t *testing.T) {
)
defer cleanup()
node := client.Node()
node.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early versions
node := client.UpdateConfig(func(c *cconfig.Config) {
// client RPCs not supported on early versions
c.Node.Attributes["nomad.version"] = "0.11.0"
}).Node
req0 := &structs.NodeRegisterRequest{
Node: node,
@ -1288,24 +1298,26 @@ func TestCSIVolumeEndpoint_ListExternal(t *testing.T) {
codec := rpcClient(t, srv)
index := uint64(1000)
node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsAttachDetach: true,
SupportsListVolumes: true,
node = client.UpdateConfig(func(c *cconfig.Config) {
c.Node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsAttachDetach: true,
SupportsListVolumes: true,
},
RequiresControllerPlugin: true,
},
RequiresControllerPlugin: true,
},
}
node.CSINodePlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
}
c.Node.CSINodePlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
}).Node
index++
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, index, node))
@ -1359,10 +1371,8 @@ func TestCSIVolumeEndpoint_CreateSnapshot(t *testing.T) {
)
defer cleanup()
node := client.Node()
req0 := &structs.NodeRegisterRequest{
Node: node,
Node: client.Node(),
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp0 structs.NodeUpdateResponse
@ -1382,16 +1392,18 @@ func TestCSIVolumeEndpoint_CreateSnapshot(t *testing.T) {
codec := rpcClient(t, srv)
index := uint64(1000)
node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsCreateDeleteSnapshot: true,
node := client.UpdateConfig(func(c *cconfig.Config) {
c.Node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsCreateDeleteSnapshot: true,
},
RequiresControllerPlugin: true,
},
RequiresControllerPlugin: true,
},
}
}
}).Node
index++
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, index, node))
@ -1452,10 +1464,8 @@ func TestCSIVolumeEndpoint_DeleteSnapshot(t *testing.T) {
)
defer cleanup()
node := client.Node()
req0 := &structs.NodeRegisterRequest{
Node: node,
Node: client.Node(),
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp0 structs.NodeUpdateResponse
@ -1475,16 +1485,18 @@ func TestCSIVolumeEndpoint_DeleteSnapshot(t *testing.T) {
codec := rpcClient(t, srv)
index := uint64(1000)
node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsCreateDeleteSnapshot: true,
node := client.UpdateConfig(func(c *cconfig.Config) {
c.Node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsCreateDeleteSnapshot: true,
},
RequiresControllerPlugin: true,
},
RequiresControllerPlugin: true,
},
}
}
}).Node
index++
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, index, node))
@ -1551,9 +1563,8 @@ func TestCSIVolumeEndpoint_ListSnapshots(t *testing.T) {
)
defer cleanup()
node := client.Node()
req0 := &structs.NodeRegisterRequest{
Node: node,
Node: client.Node(),
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp0 structs.NodeUpdateResponse
@ -1571,16 +1582,18 @@ func TestCSIVolumeEndpoint_ListSnapshots(t *testing.T) {
codec := rpcClient(t, srv)
index := uint64(1000)
node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsListSnapshots: true,
node := client.UpdateConfig(func(c *cconfig.Config) {
c.Node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsListSnapshots: true,
},
RequiresControllerPlugin: true,
},
RequiresControllerPlugin: true,
},
}
}
}).Node
index++
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, index, node))

View File

@ -5,6 +5,7 @@ package nomad
import (
"github.com/hashicorp/consul/agent/consul/autopilot"
"golang.org/x/exp/slices"
)
// LicenseConfig allows for tunable licensing config
@ -13,6 +14,16 @@ type LicenseConfig struct {
AdditionalPubKeys []string
}
func (c *LicenseConfig) Copy() *LicenseConfig {
if c == nil {
return nil
}
nc := *c
nc.AdditionalPubKeys = slices.Clone(c.AdditionalPubKeys)
return &nc
}
type EnterpriseState struct{}
func (es *EnterpriseState) Features() uint64 {

View File

@ -1,11 +1,26 @@
package config
import (
"github.com/hashicorp/nomad/helper"
"golang.org/x/exp/slices"
)
// SentinelConfig is configuration specific to Sentinel
type SentinelConfig struct {
// Imports are the configured imports
Imports []*SentinelImport `hcl:"import,expand"`
}
func (s *SentinelConfig) Copy() *SentinelConfig {
if s == nil {
return nil
}
ns := *s
ns.Imports = helper.CopySlice(s.Imports)
return &ns
}
// SentinelImport is used per configured import
type SentinelImport struct {
Name string `hcl:",key"`
@ -13,6 +28,16 @@ type SentinelImport struct {
Args []string `hcl:"args"`
}
func (s *SentinelImport) Copy() *SentinelImport {
if s == nil {
return nil
}
ns := *s
ns.Args = slices.Clone(s.Args)
return &ns
}
// Merge is used to merge two Sentinel configs together. The settings from the input always take precedence.
func (a *SentinelConfig) Merge(b *SentinelConfig) *SentinelConfig {
result := *a

View File

@ -125,16 +125,6 @@ func (k *KeyLoader) GetClientCertificate(*tls.CertificateRequestInfo) (*tls.Cert
return k.certificate, nil
}
func (k *KeyLoader) Copy() *KeyLoader {
if k == nil {
return nil
}
new := KeyLoader{}
new.certificate = k.certificate
return &new
}
// GetKeyLoader returns the keyloader for a TLSConfig object. If the keyloader
// has not been initialized, it will first do so.
func (t *TLSConfig) GetKeyLoader() *KeyLoader {
@ -162,8 +152,12 @@ func (t *TLSConfig) Copy() *TLSConfig {
new.CAFile = t.CAFile
new.CertFile = t.CertFile
// Shallow copy the key loader as its GetOutgoingCertificate method is what
// is used by the HTTP server to retrieve the certificate. If we create a new
// KeyLoader struct, the HTTP server will still be calling the old
// GetOutgoingCertificate method.
t.keyloaderLock.Lock()
new.KeyLoader = t.KeyLoader.Copy()
new.KeyLoader = t.KeyLoader
t.keyloaderLock.Unlock()
new.KeyFile = t.KeyFile

View File

@ -125,6 +125,15 @@ type AutopilotConfig struct {
ModifyIndex uint64
}
func (a *AutopilotConfig) Copy() *AutopilotConfig {
if a == nil {
return nil
}
na := *a
return &na
}
// SchedulerAlgorithm is an enum string that encapsulates the valid options for a
// SchedulerConfiguration stanza's SchedulerAlgorithm. These modes will allow the
// scheduler to be user-selectable.
@ -167,6 +176,15 @@ type SchedulerConfiguration struct {
ModifyIndex uint64
}
func (s *SchedulerConfiguration) Copy() *SchedulerConfiguration {
if s == nil {
return s
}
ns := *s
return &ns
}
func (s *SchedulerConfiguration) EffectiveSchedulerAlgorithm() SchedulerAlgorithm {
if s == nil || s.SchedulerAlgorithm == "" {
return SchedulerAlgorithmBinpack

View File

@ -59,6 +59,15 @@ type SearchConfig struct {
MinTermLength int `hcl:"min_term_length"`
}
func (s *SearchConfig) Copy() *SearchConfig {
if s == nil {
return nil
}
ns := *s
return &ns
}
// SearchResponse is used to return matches and information about whether
// the match list is truncated specific to each type of Context.
type SearchResponse struct {

View File

@ -2059,9 +2059,8 @@ func (n *Node) Copy() *Node {
if n == nil {
return nil
}
nn := new(Node)
*nn = *n
nn.Attributes = helper.CopyMapStringString(nn.Attributes)
nn := *n
nn.Attributes = helper.CopyMap(nn.Attributes)
nn.NodeResources = nn.NodeResources.Copy()
nn.ReservedResources = nn.ReservedResources.Copy()
nn.Resources = nn.Resources.Copy()
@ -2069,87 +2068,14 @@ func (n *Node) Copy() *Node {
nn.Links = helper.CopyMapStringString(nn.Links)
nn.Meta = helper.CopyMapStringString(nn.Meta)
nn.DrainStrategy = nn.DrainStrategy.Copy()
nn.Events = copyNodeEvents(n.Events)
nn.Drivers = copyNodeDrivers(n.Drivers)
nn.CSIControllerPlugins = copyNodeCSI(nn.CSIControllerPlugins)
nn.CSINodePlugins = copyNodeCSI(nn.CSINodePlugins)
nn.HostVolumes = copyNodeHostVolumes(n.HostVolumes)
nn.HostNetworks = copyNodeHostNetworks(n.HostNetworks)
nn.Events = helper.CopySlice(n.Events)
nn.Drivers = helper.DeepCopyMap(n.Drivers)
nn.CSIControllerPlugins = helper.DeepCopyMap(nn.CSIControllerPlugins)
nn.CSINodePlugins = helper.DeepCopyMap(nn.CSINodePlugins)
nn.HostVolumes = helper.DeepCopyMap(n.HostVolumes)
nn.HostNetworks = helper.DeepCopyMap(n.HostNetworks)
nn.LastDrain = nn.LastDrain.Copy()
return nn
}
// copyNodeEvents is a helper to copy a list of NodeEvent's
func copyNodeEvents(events []*NodeEvent) []*NodeEvent {
l := len(events)
if l == 0 {
return nil
}
c := make([]*NodeEvent, l)
for i, event := range events {
c[i] = event.Copy()
}
return c
}
// copyNodeCSI is a helper to copy a map of CSIInfo
func copyNodeCSI(plugins map[string]*CSIInfo) map[string]*CSIInfo {
l := len(plugins)
if l == 0 {
return nil
}
c := make(map[string]*CSIInfo, l)
for plugin, info := range plugins {
c[plugin] = info.Copy()
}
return c
}
// copyNodeDrivers is a helper to copy a map of DriverInfo
func copyNodeDrivers(drivers map[string]*DriverInfo) map[string]*DriverInfo {
l := len(drivers)
if l == 0 {
return nil
}
c := make(map[string]*DriverInfo, l)
for driver, info := range drivers {
c[driver] = info.Copy()
}
return c
}
// copyNodeHostVolumes is a helper to copy a map of string to Volume
func copyNodeHostVolumes(volumes map[string]*ClientHostVolumeConfig) map[string]*ClientHostVolumeConfig {
l := len(volumes)
if l == 0 {
return nil
}
c := make(map[string]*ClientHostVolumeConfig, l)
for volume, v := range volumes {
c[volume] = v.Copy()
}
return c
}
// copyNodeHostVolumes is a helper to copy a map of string to HostNetwork
func copyNodeHostNetworks(networks map[string]*ClientHostNetworkConfig) map[string]*ClientHostNetworkConfig {
l := len(networks)
if l == 0 {
return nil
}
c := make(map[string]*ClientHostNetworkConfig, l)
for network, v := range networks {
c[network] = v.Copy()
}
return c
return &nn
}
// TerminalStatus returns if the current status is terminal and

View File

@ -30,6 +30,15 @@ type VersionInfo struct {
VersionMetadata string
}
func (v *VersionInfo) Copy() *VersionInfo {
if v == nil {
return nil
}
nv := *v
return &nv
}
func GetVersion() *VersionInfo {
ver := Version
rel := VersionPrerelease