Merge pull request #2009 from hashicorp/f-use-embedded-consul

Add a chaos test for consul syncer and fix some races it found
This commit is contained in:
Michael Schurter 2016-12-05 10:55:47 -08:00 committed by GitHub
commit 09ffe4307d
3 changed files with 307 additions and 26 deletions

View file

@ -0,0 +1,193 @@
// +build chaos
package consul
import (
"fmt"
"io/ioutil"
"sort"
"strings"
"sync"
"testing"
"time"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
)
func TestSyncerChaos(t *testing.T) {
// Create an embedded Consul server
testconsul := testutil.NewTestServerConfig(t, func(c *testutil.TestServerConfig) {
// If -v wasn't specified squelch consul logging
if !testing.Verbose() {
c.Stdout = ioutil.Discard
c.Stderr = ioutil.Discard
}
})
defer testconsul.Stop()
// Configure Syncer to talk to the test server
cconf := config.DefaultConsulConfig()
cconf.Addr = testconsul.HTTPAddr
clientSyncer, err := NewSyncer(cconf, nil, logger)
if err != nil {
t.Fatalf("Error creating Syncer: %v", err)
}
defer clientSyncer.Shutdown()
execSyncer, err := NewSyncer(cconf, nil, logger)
if err != nil {
t.Fatalf("Error creating Syncer: %v", err)
}
defer execSyncer.Shutdown()
clientService := &structs.Service{Name: "nomad-client"}
services := map[ServiceKey]*structs.Service{
GenerateServiceKey(clientService): clientService,
}
if err := clientSyncer.SetServices("client", services); err != nil {
t.Fatalf("error setting client service: %v", err)
}
const execn = 100
const reapern = 2
errors := make(chan error, 100)
wg := sync.WaitGroup{}
// Start goroutines to concurrently SetServices
for i := 0; i < execn; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
domain := ServiceDomain(fmt.Sprintf("exec-%d", i))
services := map[ServiceKey]*structs.Service{}
for ii := 0; ii < 10; ii++ {
s := &structs.Service{Name: fmt.Sprintf("exec-%d-%d", i, ii)}
services[GenerateServiceKey(s)] = s
if err := execSyncer.SetServices(domain, services); err != nil {
select {
case errors <- err:
default:
}
return
}
time.Sleep(1)
}
}(i)
}
// SyncServices runs a timer started by Syncer.Run which we don't use
// in this test, so run SyncServices concurrently
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < execn; i++ {
if err := execSyncer.SyncServices(); err != nil {
select {
case errors <- err:
default:
}
return
}
time.Sleep(100)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := clientSyncer.ReapUnmatched([]ServiceDomain{"nomad-client"}); err != nil {
select {
case errors <- err:
default:
}
return
}
}()
// Reap all but exec-0-*
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < execn; i++ {
if err := execSyncer.ReapUnmatched([]ServiceDomain{"exec-0", ServiceDomain(fmt.Sprintf("exec-%d", i))}); err != nil {
select {
case errors <- err:
default:
}
}
time.Sleep(100)
}
}()
go func() {
wg.Wait()
close(errors)
}()
for err := range errors {
if err != nil {
t.Errorf("error setting service from executor goroutine: %v", err)
}
}
// Do a final ReapUnmatched to get consul back into a deterministic state
if err := execSyncer.ReapUnmatched([]ServiceDomain{"exec-0"}); err != nil {
t.Fatalf("error doing final reap: %v", err)
}
// flattenedServices should be fully populated as ReapUnmatched doesn't
// touch Syncer's internal state
expected := map[string]struct{}{}
for i := 0; i < execn; i++ {
for ii := 0; ii < 10; ii++ {
expected[fmt.Sprintf("exec-%d-%d", i, ii)] = struct{}{}
}
}
for _, s := range execSyncer.flattenedServices() {
_, ok := expected[s.Name]
if !ok {
t.Errorf("%s unexpected", s.Name)
}
delete(expected, s.Name)
}
if len(expected) > 0 {
left := []string{}
for s := range expected {
left = append(left, s)
}
sort.Strings(left)
t.Errorf("Couldn't find %d names in flattened services:\n%s", len(expected), strings.Join(left, "\n"))
}
// All but exec-0 and possibly some of exec-99 should have been reaped
{
services, err := execSyncer.client.Agent().Services()
if err != nil {
t.Fatalf("Error getting services: %v", err)
}
expected := []int{}
for k, service := range services {
if service.Service == "consul" {
continue
}
i := -1
ii := -1
fmt.Sscanf(service.Service, "exec-%d-%d", &i, &ii)
switch {
case i == -1 || ii == -1:
t.Errorf("invalid service: %s -> %s", k, service.Service)
case i != 0 || ii > 9:
t.Errorf("unexpected service: %s -> %s", k, service.Service)
default:
expected = append(expected, ii)
}
}
if len(expected) != 10 {
t.Errorf("expected 0-9 but found: %#q", expected)
}
}
}

View file

@ -35,7 +35,6 @@ import (
"time"
consul "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
@ -56,11 +55,11 @@ const (
nomadServicePrefix = "_nomad"
// The periodic time interval for syncing services and checks with Consul
syncInterval = 5 * time.Second
defaultSyncInterval = 6 * time.Second
// syncJitter provides a little variance in the frequency at which
// defaultSyncJitter provides a little variance in the frequency at which
// Syncer polls Consul.
syncJitter = 8
defaultSyncJitter = time.Second
// ttlCheckBuffer is the time interval that Nomad can take to report Consul
// the check result
@ -144,6 +143,13 @@ type Syncer struct {
periodicCallbacks map[string]types.PeriodicCallback
notifySyncCh chan struct{}
periodicLock sync.RWMutex
// The periodic time interval for syncing services and checks with Consul
syncInterval time.Duration
// syncJitter provides a little variance in the frequency at which
// Syncer polls Consul.
syncJitter time.Duration
}
// NewSyncer returns a new consul.Syncer
@ -168,8 +174,11 @@ func NewSyncer(consulConfig *config.ConsulConfig, shutdownCh chan struct{}, logg
checkGroups: make(map[ServiceDomain]map[ServiceKey][]*consul.AgentCheckRegistration),
checkRunners: make(map[consulCheckID]*CheckRunner),
periodicCallbacks: make(map[string]types.PeriodicCallback),
notifySyncCh: make(chan struct{}, 1),
// default noop implementation of addrFinder
addrFinder: func(string) (string, int) { return "", 0 },
addrFinder: func(string) (string, int) { return "", 0 },
syncInterval: defaultSyncInterval,
syncJitter: defaultSyncJitter,
}
return &consulSyncer, nil
@ -809,7 +818,7 @@ func (c *Syncer) Run() {
for {
select {
case <-sync.C:
d := syncInterval - lib.RandomStagger(syncInterval/syncJitter)
d := c.syncInterval - c.syncJitter
sync.Reset(d)
if err := c.SyncServices(); err != nil {
@ -824,7 +833,7 @@ func (c *Syncer) Run() {
c.consulAvailable = true
}
case <-c.notifySyncCh:
sync.Reset(syncInterval)
sync.Reset(0)
case <-c.shutdownCh:
c.Shutdown()
case <-c.notifyShutdownCh:
@ -872,8 +881,8 @@ func (c *Syncer) SyncServices() error {
// the syncer
func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentService) map[consulServiceID]*consul.AgentService {
localServices := make(map[consulServiceID]*consul.AgentService, len(consulServices))
c.registryLock.RLock()
defer c.registryLock.RUnlock()
c.groupsLock.RLock()
defer c.groupsLock.RUnlock()
for serviceID, service := range consulServices {
for domain := range c.servicesGroups {
if strings.HasPrefix(service.ID, fmt.Sprintf("%s-%s", nomadServicePrefix, domain)) {
@ -889,8 +898,8 @@ func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentSer
// services with Syncer's idPrefix.
func (c *Syncer) filterConsulChecks(consulChecks map[string]*consul.AgentCheck) map[consulCheckID]*consul.AgentCheck {
localChecks := make(map[consulCheckID]*consul.AgentCheck, len(consulChecks))
c.registryLock.RLock()
defer c.registryLock.RUnlock()
c.groupsLock.RLock()
defer c.groupsLock.RUnlock()
for checkID, check := range consulChecks {
for domain := range c.checkGroups {
if strings.HasPrefix(check.ServiceID, fmt.Sprintf("%s-%s", nomadServicePrefix, domain)) {

View file

@ -1,6 +1,7 @@
package consul
import (
"io/ioutil"
"log"
"net"
"os"
@ -9,6 +10,7 @@ import (
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
)
@ -21,6 +23,69 @@ const (
var logger = log.New(os.Stdout, "", log.LstdFlags)
func TestSyncNow(t *testing.T) {
cs, testconsul := testConsul(t)
defer cs.Shutdown()
defer testconsul.Stop()
cs.SetAddrFinder(func(h string) (string, int) {
a, pstr, _ := net.SplitHostPort(h)
p, _ := net.LookupPort("tcp", pstr)
return a, p
})
cs.syncInterval = 9000 * time.Hour
service := &structs.Service{Name: "foo1", Tags: []string{"a", "b"}}
services := map[ServiceKey]*structs.Service{
GenerateServiceKey(service): service,
}
// Run syncs once on startup and then blocks forever
go cs.Run()
if err := cs.SetServices(serviceGroupName, services); err != nil {
t.Fatalf("error setting services: %v", err)
}
synced := false
for i := 0; !synced && i < 10; i++ {
time.Sleep(250 * time.Millisecond)
agentServices, err := cs.queryAgentServices()
if err != nil {
t.Fatalf("error querying consul services: %v", err)
}
synced = len(agentServices) == 1
}
if !synced {
t.Fatalf("initial sync never occurred")
}
// SetServices again should cause another sync
service1 := &structs.Service{Name: "foo1", Tags: []string{"Y", "Z"}}
service2 := &structs.Service{Name: "bar"}
services = map[ServiceKey]*structs.Service{
GenerateServiceKey(service1): service1,
GenerateServiceKey(service2): service2,
}
if err := cs.SetServices(serviceGroupName, services); err != nil {
t.Fatalf("error setting services: %v", err)
}
synced = false
for i := 0; !synced && i < 10; i++ {
time.Sleep(250 * time.Millisecond)
agentServices, err := cs.queryAgentServices()
if err != nil {
t.Fatalf("error querying consul services: %v", err)
}
synced = len(agentServices) == 2
}
if !synced {
t.Fatalf("SetServices didn't sync immediately")
}
}
func TestCheckRegistration(t *testing.T) {
cs, err := NewSyncer(config.DefaultConsulConfig(), make(chan struct{}), logger)
if err != nil {
@ -109,16 +174,35 @@ func TestCheckRegistration(t *testing.T) {
}
}
func TestConsulServiceRegisterServices(t *testing.T) {
cs, err := NewSyncer(config.DefaultConsulConfig(), nil, logger)
// testConsul returns a Syncer configured with an embedded Consul server.
//
// Callers must defer Syncer.Shutdown() and TestServer.Stop()
//
func testConsul(t *testing.T) (*Syncer, *testutil.TestServer) {
// Create an embedded Consul server
testconsul := testutil.NewTestServerConfig(t, func(c *testutil.TestServerConfig) {
// If -v wasn't specified squelch consul logging
if !testing.Verbose() {
c.Stdout = ioutil.Discard
c.Stderr = ioutil.Discard
}
})
// Configure Syncer to talk to the test server
cconf := config.DefaultConsulConfig()
cconf.Addr = testconsul.HTTPAddr
cs, err := NewSyncer(cconf, nil, logger)
if err != nil {
t.Fatalf("Err: %v", err)
t.Fatalf("Error creating Syncer: %v", err)
}
return cs, testconsul
}
func TestConsulServiceRegisterServices(t *testing.T) {
cs, testconsul := testConsul(t)
defer cs.Shutdown()
// Skipping the test if consul isn't present
if !cs.consulPresent() {
t.Skip("skipping because consul isn't present")
}
defer testconsul.Stop()
service1 := &structs.Service{Name: "foo", Tags: []string{"a", "b"}}
service2 := &structs.Service{Name: "foo"}
@ -178,15 +262,10 @@ func TestConsulServiceRegisterServices(t *testing.T) {
}
func TestConsulServiceUpdateService(t *testing.T) {
cs, err := NewSyncer(config.DefaultConsulConfig(), nil, logger)
if err != nil {
t.Fatalf("Err: %v", err)
}
cs, testconsul := testConsul(t)
defer cs.Shutdown()
// Skipping the test if consul isn't present
if !cs.consulPresent() {
t.Skip("skipping because consul isn't present")
}
defer testconsul.Stop()
cs.SetAddrFinder(func(h string) (string, int) {
a, pstr, _ := net.SplitHostPort(h)
p, _ := net.LookupPort("tcp", pstr)