64b558c14c
Downgrading the Raft version protocol is not a supported operation. Checking for a downgrade is hard since this information is not stored in any persistent place. When a server re-joins a cluster with a prior Raft version, the Serf tag is updated so Nomad can't tell that the version changed. Mixed version clusters must be supported to allow for zero-downtime rolling upgrades. During this it's expected that the cluster will have mixed Raft versions. Enforcing consistency strong version consistency would disrupt this flow. The approach taken here is to store the Raft version on disk. When the server starts the `raft_protocol` value is written to the file `data_dir/raft/version`. If that file already exists, its content is checked against the current `raft_protocol` value to detect downgrades and prevent the server from starting. Any other types of errors are ignore to prevent disruptions that are outside the control of operators. The only option in cases of an invalid or corrupt file would be to delete it, making this check useless. So just overwrite its content with the new version and provide guidance on how to check that their cluster is an expected state.
180 lines
4.7 KiB
Go
180 lines
4.7 KiB
Go
package nomad
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"net"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/nomad/command/agent/consul"
|
|
"github.com/hashicorp/nomad/helper/freeport"
|
|
"github.com/hashicorp/nomad/helper/pluginutils/catalog"
|
|
"github.com/hashicorp/nomad/helper/pluginutils/singleton"
|
|
"github.com/hashicorp/nomad/helper/testlog"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/version"
|
|
"github.com/pkg/errors"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
var (
|
|
nodeNumber int32 = 0
|
|
)
|
|
|
|
func TestACLServer(t *testing.T, cb func(*Config)) (*Server, *structs.ACLToken, func()) {
|
|
server, cleanup := TestServer(t, func(c *Config) {
|
|
c.ACLEnabled = true
|
|
if cb != nil {
|
|
cb(c)
|
|
}
|
|
})
|
|
token := mock.ACLManagementToken()
|
|
err := server.State().BootstrapACLTokens(structs.MsgTypeTestSetup, 1, 0, token)
|
|
if err != nil {
|
|
t.Fatalf("failed to bootstrap ACL token: %v", err)
|
|
}
|
|
return server, token, cleanup
|
|
}
|
|
|
|
func TestServer(t *testing.T, cb func(*Config)) (*Server, func()) {
|
|
s, c, err := TestServerErr(t, cb)
|
|
require.NoError(t, err, "failed to start test server")
|
|
return s, c
|
|
}
|
|
|
|
func TestServerErr(t *testing.T, cb func(*Config)) (*Server, func(), error) {
|
|
// Setup the default settings
|
|
config := DefaultConfig()
|
|
|
|
// Setup default enterprise-specific settings, including license
|
|
defaultEnterpriseTestConfig(config)
|
|
|
|
config.Build = version.Version + "+unittest"
|
|
config.DevMode = true
|
|
config.EnableEventBroker = true
|
|
config.BootstrapExpect = 1
|
|
nodeNum := atomic.AddInt32(&nodeNumber, 1)
|
|
config.NodeName = fmt.Sprintf("nomad-%03d", nodeNum)
|
|
|
|
// configure logger
|
|
config.Logger, config.LogOutput = testlog.HCLoggerNode(t, nodeNum)
|
|
|
|
// Tighten the Serf timing
|
|
config.SerfConfig.MemberlistConfig.BindAddr = "127.0.0.1"
|
|
config.SerfConfig.MemberlistConfig.SuspicionMult = 2
|
|
config.SerfConfig.MemberlistConfig.RetransmitMult = 2
|
|
config.SerfConfig.MemberlistConfig.ProbeTimeout = 50 * time.Millisecond
|
|
config.SerfConfig.MemberlistConfig.ProbeInterval = 100 * time.Millisecond
|
|
config.SerfConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
|
|
|
|
// Tighten the Raft timing
|
|
config.RaftConfig.LeaderLeaseTimeout = 50 * time.Millisecond
|
|
config.RaftConfig.HeartbeatTimeout = 50 * time.Millisecond
|
|
config.RaftConfig.ElectionTimeout = 50 * time.Millisecond
|
|
config.RaftTimeout = 500 * time.Millisecond
|
|
|
|
// Disable Vault
|
|
f := false
|
|
config.VaultConfig.Enabled = &f
|
|
|
|
// Tighten the autopilot timing
|
|
config.AutopilotConfig.ServerStabilizationTime = 100 * time.Millisecond
|
|
config.ServerHealthInterval = 50 * time.Millisecond
|
|
config.AutopilotInterval = 100 * time.Millisecond
|
|
|
|
// Set the plugin loaders
|
|
config.PluginLoader = catalog.TestPluginLoader(t)
|
|
config.PluginSingletonLoader = singleton.NewSingletonLoader(config.Logger, config.PluginLoader)
|
|
|
|
// Disable consul autojoining: tests typically join servers directly
|
|
config.ConsulConfig.ServerAutoJoin = &f
|
|
|
|
// Enable fuzzy search API
|
|
config.SearchConfig = &structs.SearchConfig{
|
|
FuzzyEnabled: true,
|
|
LimitQuery: 20,
|
|
LimitResults: 100,
|
|
MinTermLength: 2,
|
|
}
|
|
|
|
// Invoke the callback if any
|
|
if cb != nil {
|
|
cb(config)
|
|
}
|
|
|
|
cCatalog := consul.NewMockCatalog(config.Logger)
|
|
cConfigs := consul.NewMockConfigsAPI(config.Logger)
|
|
cACLs := consul.NewMockACLsAPI(config.Logger)
|
|
|
|
for i := 10; i >= 0; i-- {
|
|
// Get random ports, need to cleanup later
|
|
ports := freeport.MustTake(2)
|
|
|
|
config.RPCAddr = &net.TCPAddr{
|
|
IP: []byte{127, 0, 0, 1},
|
|
Port: ports[0],
|
|
}
|
|
config.SerfConfig.MemberlistConfig.BindPort = ports[1]
|
|
|
|
// Create server
|
|
server, err := NewServer(config, cCatalog, cConfigs, cACLs)
|
|
if err == nil {
|
|
return server, func() {
|
|
ch := make(chan error)
|
|
go func() {
|
|
defer close(ch)
|
|
|
|
// Shutdown server
|
|
err := server.Shutdown()
|
|
if err != nil {
|
|
ch <- errors.Wrap(err, "failed to shutdown server")
|
|
}
|
|
|
|
freeport.Return(ports)
|
|
}()
|
|
|
|
select {
|
|
case e := <-ch:
|
|
if e != nil {
|
|
t.Fatal(e.Error())
|
|
}
|
|
case <-time.After(1 * time.Minute):
|
|
t.Fatal("timed out while shutting down server")
|
|
}
|
|
}, nil
|
|
} else if i == 0 {
|
|
freeport.Return(ports)
|
|
return nil, nil, err
|
|
} else {
|
|
if server != nil {
|
|
_ = server.Shutdown()
|
|
freeport.Return(ports)
|
|
}
|
|
wait := time.Duration(rand.Int31n(2000)) * time.Millisecond
|
|
time.Sleep(wait)
|
|
}
|
|
}
|
|
|
|
return nil, nil, nil
|
|
}
|
|
|
|
func TestJoin(t *testing.T, servers ...*Server) {
|
|
for i := 0; i < len(servers)-1; i++ {
|
|
addr := fmt.Sprintf("127.0.0.1:%d",
|
|
servers[i].config.SerfConfig.MemberlistConfig.BindPort)
|
|
|
|
for j := i + 1; j < len(servers); j++ {
|
|
num, err := servers[j].Join([]string{addr})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if num != 1 {
|
|
t.Fatalf("bad: %d", num)
|
|
}
|
|
}
|
|
}
|
|
}
|