Merge branch 'master' of https://github.com/alistanis/consul into use-http-package-statuses

This commit is contained in:
Chris Cooper 2016-02-05 17:30:43 -05:00
commit de4129fced
43 changed files with 402 additions and 238 deletions

View File

@ -2,6 +2,16 @@
IMPROVEMENTS: IMPROVEMENTS:
* Consul agents will now periodically reconnect to available Consul servers
in order to redistribute their RPC query load. Consul clients will, by
default, attempt to establish a new connection every 120s to 180s, however
the rate at which agents begin to query new servers is proportional to the
size of the Consul cluster (servers should never receive more than 64 new
connections per second per Consul server as a result of rebalancing).
Clusters in stable environments who use `allow_stale` should see a more
even distribution of query load across all of their Consul
servers. [GH-1667]
BUG FIXES: BUG FIXES:
* Updated the internal web ui (`-ui` option) to latest released build, fixing * Updated the internal web ui (`-ui` option) to latest released build, fixing
@ -46,7 +56,7 @@ IMPROVEMENTS:
messages and HTTP access logs [GH-1513] [GH-1448] messages and HTTP access logs [GH-1513] [GH-1448]
* API clients configured for insecure SSL now use an HTTP transport that's * API clients configured for insecure SSL now use an HTTP transport that's
set up the same way as the Go default transport [GH-1526] set up the same way as the Go default transport [GH-1526]
* Added new per-host telemery on DNS requests [GH-1537] * Added new per-host telemetry on DNS requests [GH-1537]
* Added support for reaping child processes which is useful when running * Added support for reaping child processes which is useful when running
Consul as PID 1 in Docker containers [GH-1539] Consul as PID 1 in Docker containers [GH-1539]
* Added new `-ui` command line and `ui` config option that enables a built-in * Added new `-ui` command line and `ui` config option that enables a built-in
@ -113,12 +123,12 @@ BUG FIXES:
* Fixed bad lock handler execution during shutdown [GH-1080] [GH-1158] [GH-1214] * Fixed bad lock handler execution during shutdown [GH-1080] [GH-1158] [GH-1214]
* Added missing support for AAAA queries for nodes [GH-1222] * Added missing support for AAAA queries for nodes [GH-1222]
* Tokens passed from the CLI or API work for maint mode [GH-1230] * Tokens passed from the CLI or API work for maint mode [GH-1230]
* Fixed service derigister/reregister flaps that could happen during * Fixed service deregister/reregister flaps that could happen during
`consul reload` [GH-1235] `consul reload` [GH-1235]
* Fixed the Go API client to properly distinguish between expired sessions * Fixed the Go API client to properly distinguish between expired sessions
and sessions that don't exist [GH-1041] and sessions that don't exist [GH-1041]
* Fixed the KV section of the UI to work on Safari [GH-1321] * Fixed the KV section of the UI to work on Safari [GH-1321]
* Cleaned up Javascript for built-in UI with bug fixes [GH-1338] * Cleaned up JavaScript for built-in UI with bug fixes [GH-1338]
IMPROVEMENTS: IMPROVEMENTS:
@ -255,8 +265,8 @@ FEATURES:
* Merge `armon/consul-api` into `api` as official Go client. * Merge `armon/consul-api` into `api` as official Go client.
* Support for distributed locks and semaphores in API client [GH-594] [GH-600] * Support for distributed locks and semaphores in API client [GH-594] [GH-600]
* Support for native HTTP health checks [GH-592] * Support for native HTTP health checks [GH-592]
* Support for node and service maintanence modes [GH-606] * Support for node and service maintenance modes [GH-606]
* Added new "consul maint" command to easily toggle maintanence modes [GH-625] * Added new "consul maint" command to easily toggle maintenance modes [GH-625]
* Added new "consul lock" command for simple highly-available deployments. * Added new "consul lock" command for simple highly-available deployments.
This lets Consul manage the leader election and easily handle N+1 deployments This lets Consul manage the leader election and easily handle N+1 deployments
without the applications being Consul aware. [GH-619] without the applications being Consul aware. [GH-619]
@ -336,7 +346,7 @@ BUG FIXES:
* Fixing issue with Session ID and ACL ID generation. [GH-391] * Fixing issue with Session ID and ACL ID generation. [GH-391]
* Fixing multiple headers for /v1/event/list endpoint [GH-361] * Fixing multiple headers for /v1/event/list endpoint [GH-361]
* Fixing graceful leave of leader causing invalid Raft peers [GH-360] * Fixing graceful leave of leader causing invalid Raft peers [GH-360]
* Fixing bug with closing TLS connction on error * Fixing bug with closing TLS connection on error
* Fixing issue with node reaping [GH-371] * Fixing issue with node reaping [GH-371]
* Fixing aggressive deadlock time [GH-389] * Fixing aggressive deadlock time [GH-389]
* Fixing syslog filter level [GH-272] * Fixing syslog filter level [GH-272]
@ -348,7 +358,7 @@ BUG FIXES:
IMPROVEMENTS: IMPROVEMENTS:
* Use "critical" health state instead of "unknown" [GH-341] * Use "critical" health state instead of "unknown" [GH-341]
* Consul service can be targed for exec [GH-344] * Consul service can be targeted for exec [GH-344]
* Provide debug logging for session invalidation [GH-390] * Provide debug logging for session invalidation [GH-390]
* Added "Deregister" button to UI [GH-364] * Added "Deregister" button to UI [GH-364]
* Added `enable_truncate` DNS configuration flag [GH-376] * Added `enable_truncate` DNS configuration flag [GH-376]
@ -417,7 +427,7 @@ BUG FIXES:
* Fixed handling of `-rejoin` flag * Fixed handling of `-rejoin` flag
* Restored 0.2 TLS behavior, thanks to @nelhage [GH-233] * Restored 0.2 TLS behavior, thanks to @nelhage [GH-233]
* Fix the statsite flags, thanks to @nelhage [GH-243] * Fix the statsite flags, thanks to @nelhage [GH-243]
* Fixed filters on criticial / non-passing checks [GH-241] * Fixed filters on critical / non-passing checks [GH-241]
* Fixed initial log compaction crash [GH-297] * Fixed initial log compaction crash [GH-297]
IMPROVEMENTS: IMPROVEMENTS:
@ -448,7 +458,7 @@ IMPROVEMENTS:
* `info` includes build version information * `info` includes build version information
* Sorted results for datacneter list [GH-198] * Sorted results for datacneter list [GH-198]
* Switch multiplexing to yamux * Switch multiplexing to yamux
* Allow multiple CA certis in ca_file [GH-174] * Allow multiple CA certs in ca_file [GH-174]
* Enable logging to syslog. [GH-105] * Enable logging to syslog. [GH-105]
* Allow raw key value lookup [GH-150] * Allow raw key value lookup [GH-150]
* Log encryption enabled [GH-151] * Log encryption enabled [GH-151]
@ -500,7 +510,7 @@ BUG FIXES:
* Windows agents won't show "failed to decode" errors on every RPC * Windows agents won't show "failed to decode" errors on every RPC
request. request.
* Fixed memory leak with RPC clients. [GH-149] * Fixed memory leak with RPC clients. [GH-149]
* Serf name conflict resoultion disabled. [GH-97] * Serf name conflict resolution disabled. [GH-97]
* Raft deadlock possibility fixed. [GH-141] * Raft deadlock possibility fixed. [GH-141]
MISC: MISC:
@ -523,7 +533,7 @@ FEATURES:
allow for higher throughput and read scalability. [GH-68] allow for higher throughput and read scalability. [GH-68]
* /v1/health/service/ endpoint can take an optional `?passing` flag * /v1/health/service/ endpoint can take an optional `?passing` flag
to filter to only nodes with passing results. [GH-57] to filter to only nodes with passing results. [GH-57]
* The KV endpoint suports listing keys with the `?keys` query parameter, * The KV endpoint supports listing keys with the `?keys` query parameter,
and limited up to a separator using `?separator=`. and limited up to a separator using `?separator=`.
IMPROVEMENTS: IMPROVEMENTS:

View File

@ -18,6 +18,7 @@ import (
"github.com/hashicorp/consul/consul" "github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
@ -600,8 +601,8 @@ func (a *Agent) sendCoordinate() {
for { for {
rate := a.config.SyncCoordinateRateTarget rate := a.config.SyncCoordinateRateTarget
min := a.config.SyncCoordinateIntervalMin min := a.config.SyncCoordinateIntervalMin
intv := rateScaledInterval(rate, min, len(a.LANMembers())) intv := lib.RateScaledInterval(rate, min, len(a.LANMembers()))
intv = intv + randomStagger(intv) intv = intv + lib.RandomStagger(intv)
select { select {
case <-time.After(intv): case <-time.After(intv):

View File

@ -15,6 +15,7 @@ import (
"github.com/armon/circbuf" "github.com/armon/circbuf"
docker "github.com/fsouza/go-dockerclient" docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-cleanhttp" "github.com/hashicorp/go-cleanhttp"
) )
@ -131,7 +132,7 @@ func (c *CheckMonitor) Stop() {
// run is invoked by a goroutine to run until Stop() is called // run is invoked by a goroutine to run until Stop() is called
func (c *CheckMonitor) run() { func (c *CheckMonitor) run() {
// Get the randomized initial pause time // Get the randomized initial pause time
initialPauseTime := randomStagger(c.Interval) initialPauseTime := lib.RandomStagger(c.Interval)
c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s", initialPauseTime, c.Script) c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s", initialPauseTime, c.Script)
next := time.After(initialPauseTime) next := time.After(initialPauseTime)
for { for {
@ -366,7 +367,7 @@ func (c *CheckHTTP) Stop() {
// run is invoked by a goroutine to run until Stop() is called // run is invoked by a goroutine to run until Stop() is called
func (c *CheckHTTP) run() { func (c *CheckHTTP) run() {
// Get the randomized initial pause time // Get the randomized initial pause time
initialPauseTime := randomStagger(c.Interval) initialPauseTime := lib.RandomStagger(c.Interval)
c.Logger.Printf("[DEBUG] agent: pausing %v before first HTTP request of %s", initialPauseTime, c.HTTP) c.Logger.Printf("[DEBUG] agent: pausing %v before first HTTP request of %s", initialPauseTime, c.HTTP)
next := time.After(initialPauseTime) next := time.After(initialPauseTime)
for { for {
@ -482,7 +483,7 @@ func (c *CheckTCP) Stop() {
// run is invoked by a goroutine to run until Stop() is called // run is invoked by a goroutine to run until Stop() is called
func (c *CheckTCP) run() { func (c *CheckTCP) run() {
// Get the randomized initial pause time // Get the randomized initial pause time
initialPauseTime := randomStagger(c.Interval) initialPauseTime := lib.RandomStagger(c.Interval)
c.Logger.Printf("[DEBUG] agent: pausing %v before first socket connection of %s", initialPauseTime, c.TCP) c.Logger.Printf("[DEBUG] agent: pausing %v before first socket connection of %s", initialPauseTime, c.TCP)
next := time.After(initialPauseTime) next := time.After(initialPauseTime)
for { for {
@ -580,7 +581,7 @@ func (c *CheckDocker) Stop() {
// run is invoked by a goroutine to run until Stop() is called // run is invoked by a goroutine to run until Stop() is called
func (c *CheckDocker) run() { func (c *CheckDocker) run() {
// Get the randomized initial pause time // Get the randomized initial pause time
initialPauseTime := randomStagger(c.Interval) initialPauseTime := lib.RandomStagger(c.Interval)
c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s -c %s in container %s", initialPauseTime, c.Shell, c.Script, c.DockerContainerID) c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s -c %s in container %s", initialPauseTime, c.Shell, c.Script, c.DockerContainerID)
next := time.After(initialPauseTime) next := time.After(initialPauseTime)
for { for {

View File

@ -16,6 +16,7 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/armon/go-metrics/datadog" "github.com/armon/go-metrics/datadog"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/watch" "github.com/hashicorp/consul/watch"
"github.com/hashicorp/go-checkpoint" "github.com/hashicorp/go-checkpoint"
"github.com/hashicorp/go-reap" "github.com/hashicorp/go-reap"
@ -424,7 +425,7 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log
// Do an immediate check within the next 30 seconds // Do an immediate check within the next 30 seconds
go func() { go func() {
time.Sleep(randomStagger(30 * time.Second)) time.Sleep(lib.RandomStagger(30 * time.Second))
c.checkpointResults(checkpoint.Check(updateParams)) c.checkpointResults(checkpoint.Check(updateParams))
}() }()
} }

View File

@ -13,6 +13,7 @@ import (
"time" "time"
"github.com/hashicorp/consul/consul" "github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/watch" "github.com/hashicorp/consul/watch"
"github.com/mitchellh/mapstructure" "github.com/mitchellh/mapstructure"
) )
@ -634,7 +635,7 @@ func DecodeConfig(r io.Reader) (*Config, error) {
allowedKeys := []string{"service", "services", "check", "checks"} allowedKeys := []string{"service", "services", "check", "checks"}
var unused []string var unused []string
for _, field := range md.Unused { for _, field := range md.Unused {
if !strContains(allowedKeys, field) { if !lib.StrContains(allowedKeys, field) {
unused = append(unused, field) unused = append(unused, field)
} }
} }

View File

@ -11,6 +11,8 @@ import (
"strings" "strings"
"testing" "testing"
"time" "time"
"github.com/hashicorp/consul/lib"
) )
func TestConfigEncryptBytes(t *testing.T) { func TestConfigEncryptBytes(t *testing.T) {
@ -1103,7 +1105,7 @@ func TestDecodeConfig_Service(t *testing.T) {
t.Fatalf("bad: %v", serv) t.Fatalf("bad: %v", serv)
} }
if !strContains(serv.Tags, "master") { if !lib.StrContains(serv.Tags, "master") {
t.Fatalf("bad: %v", serv) t.Fatalf("bad: %v", serv)
} }

View File

@ -10,6 +10,7 @@ import (
"github.com/hashicorp/consul/consul" "github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
) )
const ( const (
@ -252,7 +253,7 @@ func (l *localState) UpdateCheck(checkID, status, output string) {
if l.config.CheckUpdateInterval > 0 && check.Status == status { if l.config.CheckUpdateInterval > 0 && check.Status == status {
check.Output = output check.Output = output
if _, ok := l.deferCheck[checkID]; !ok { if _, ok := l.deferCheck[checkID]; !ok {
intv := time.Duration(uint64(l.config.CheckUpdateInterval)/2) + randomStagger(l.config.CheckUpdateInterval) intv := time.Duration(uint64(l.config.CheckUpdateInterval)/2) + lib.RandomStagger(l.config.CheckUpdateInterval)
deferSync := time.AfterFunc(intv, func() { deferSync := time.AfterFunc(intv, func() {
l.Lock() l.Lock()
if _, ok := l.checkStatus[checkID]; ok { if _, ok := l.checkStatus[checkID]; ok {
@ -302,11 +303,11 @@ SYNC:
case <-l.consulCh: case <-l.consulCh:
// Stagger the retry on leader election, avoid a thundering heard // Stagger the retry on leader election, avoid a thundering heard
select { select {
case <-time.After(randomStagger(aeScale(syncStaggerIntv, len(l.iface.LANMembers())))): case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, len(l.iface.LANMembers())))):
case <-shutdownCh: case <-shutdownCh:
return return
} }
case <-time.After(syncRetryIntv + randomStagger(aeScale(syncRetryIntv, len(l.iface.LANMembers())))): case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, len(l.iface.LANMembers())))):
case <-shutdownCh: case <-shutdownCh:
return return
} }
@ -317,7 +318,7 @@ SYNC:
// Schedule the next full sync, with a random stagger // Schedule the next full sync, with a random stagger
aeIntv := aeScale(l.config.AEInterval, len(l.iface.LANMembers())) aeIntv := aeScale(l.config.AEInterval, len(l.iface.LANMembers()))
aeIntv = aeIntv + randomStagger(aeIntv) aeIntv = aeIntv + lib.RandomStagger(aeIntv)
aeTimer := time.After(aeIntv) aeTimer := time.After(aeIntv)
// Wait for sync events // Wait for sync events

View File

@ -3,6 +3,7 @@ package agent
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt"
"os" "os"
"reflect" "reflect"
"testing" "testing"
@ -10,8 +11,17 @@ import (
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/testutil"
"github.com/hashicorp/go-uuid"
) )
func generateUUID() (ret string) {
var err error
if ret, err = uuid.GenerateUUID(); err != nil {
panic(fmt.Sprintf("Unable to generate a UUID, %v", err))
}
return ret
}
func TestRexecWriter(t *testing.T) { func TestRexecWriter(t *testing.T) {
writer := &rexecWriter{ writer := &rexecWriter{
BufCh: make(chan []byte, 16), BufCh: make(chan []byte, 16),

View File

@ -5,6 +5,7 @@ import (
"regexp" "regexp"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-uuid"
) )
const ( const (
@ -78,7 +79,10 @@ func (a *Agent) UserEvent(dc, token string, params *UserEvent) error {
} }
// Format message // Format message
params.ID = generateUUID() var err error
if params.ID, err = uuid.GenerateUUID(); err != nil {
return fmt.Errorf("UUID generation failed: %v", err)
}
params.Version = userEventMaxVersion params.Version = userEventMaxVersion
payload, err := encodeMsgPack(&params) payload, err := encodeMsgPack(&params)
if err != nil { if err != nil {

View File

@ -3,10 +3,8 @@ package agent
import ( import (
"bytes" "bytes"
"crypto/md5" "crypto/md5"
crand "crypto/rand"
"fmt" "fmt"
"math" "math"
"math/rand"
"os" "os"
"os/exec" "os/exec"
"os/user" "os/user"
@ -39,32 +37,6 @@ func aeScale(interval time.Duration, n int) time.Duration {
return time.Duration(multiplier) * interval return time.Duration(multiplier) * interval
} }
// rateScaledInterval is used to choose an interval to perform an action in order
// to target an aggregate number of actions per second across the whole cluster.
func rateScaledInterval(rate float64, min time.Duration, n int) time.Duration {
interval := time.Duration(float64(time.Second) * float64(n) / rate)
if interval < min {
return min
}
return interval
}
// Returns a random stagger interval between 0 and the duration
func randomStagger(intv time.Duration) time.Duration {
return time.Duration(uint64(rand.Int63()) % uint64(intv))
}
// strContains checks if a list contains a string
func strContains(l []string, s string) bool {
for _, v := range l {
if v == s {
return true
}
}
return false
}
// ExecScript returns a command to execute a script // ExecScript returns a command to execute a script
func ExecScript(script string) (*exec.Cmd, error) { func ExecScript(script string) (*exec.Cmd, error) {
var shell, flag string var shell, flag string
@ -82,21 +54,6 @@ func ExecScript(script string) (*exec.Cmd, error) {
return cmd, nil return cmd, nil
} }
// generateUUID is used to generate a random UUID
func generateUUID() string {
buf := make([]byte, 16)
if _, err := crand.Read(buf); err != nil {
panic(fmt.Errorf("failed to read random bytes: %v", err))
}
return fmt.Sprintf("%08x-%04x-%04x-%04x-%12x",
buf[0:4],
buf[4:6],
buf[6:8],
buf[8:10],
buf[10:16])
}
// decodeMsgPack is used to decode a MsgPack encoded object // decodeMsgPack is used to decode a MsgPack encoded object
func decodeMsgPack(buf []byte, out interface{}) error { func decodeMsgPack(buf []byte, out interface{}) error {
return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out) return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out)

View File

@ -24,39 +24,6 @@ func TestAEScale(t *testing.T) {
} }
} }
func TestRateScaledInterval(t *testing.T) {
min := 1 * time.Second
rate := 200.0
if v := rateScaledInterval(rate, min, 0); v != min {
t.Fatalf("Bad: %v", v)
}
if v := rateScaledInterval(rate, min, 100); v != min {
t.Fatalf("Bad: %v", v)
}
if v := rateScaledInterval(rate, min, 200); v != 1*time.Second {
t.Fatalf("Bad: %v", v)
}
if v := rateScaledInterval(rate, min, 1000); v != 5*time.Second {
t.Fatalf("Bad: %v", v)
}
if v := rateScaledInterval(rate, min, 5000); v != 25*time.Second {
t.Fatalf("Bad: %v", v)
}
if v := rateScaledInterval(rate, min, 10000); v != 50*time.Second {
t.Fatalf("Bad: %v", v)
}
}
func TestRandomStagger(t *testing.T) {
intv := time.Minute
for i := 0; i < 10; i++ {
stagger := randomStagger(intv)
if stagger < 0 || stagger >= intv {
t.Fatalf("Bad: %v", stagger)
}
}
}
func TestStringHash(t *testing.T) { func TestStringHash(t *testing.T) {
in := "hello world" in := "hello world"
expected := "5eb63bbbe01eeed093cb22bb8f5acdc3" expected := "5eb63bbbe01eeed093cb22bb8f5acdc3"

View File

@ -7,6 +7,7 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-uuid"
) )
// ACL endpoint is used to manipulate ACLs // ACL endpoint is used to manipulate ACLs
@ -62,7 +63,11 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
if args.ACL.ID == "" { if args.ACL.ID == "" {
state := a.srv.fsm.State() state := a.srv.fsm.State()
for { for {
args.ACL.ID = generateUUID() if args.ACL.ID, err = uuid.GenerateUUID(); err != nil {
a.srv.logger.Printf("[ERR] consul.acl: UUID generation failed: %v", err)
return err
}
_, acl, err := state.ACLGet(args.ACL.ID) _, acl, err := state.ACLGet(args.ACL.ID)
if err != nil { if err != nil {
a.srv.logger.Printf("[ERR] consul.acl: ACL lookup failed: %v", err) a.srv.logger.Printf("[ERR] consul.acl: ACL lookup failed: %v", err)

View File

@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/net-rpc-msgpackrpc"
) )
@ -436,7 +437,7 @@ func TestACLEndpoint_List(t *testing.T) {
if s.ID == anonymousToken || s.ID == "root" { if s.ID == anonymousToken || s.ID == "root" {
continue continue
} }
if !strContains(ids, s.ID) { if !lib.StrContains(ids, s.ID) {
t.Fatalf("bad: %v", s) t.Fatalf("bad: %v", s)
} }
if s.Name != "User token" { if s.Name != "User token" {

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/net-rpc-msgpackrpc"
) )
@ -978,7 +979,7 @@ func TestCatalogNodeServices(t *testing.T) {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }
services := out.NodeServices.Services services := out.NodeServices.Services
if !strContains(services["db"].Tags, "primary") || services["db"].Port != 5000 { if !lib.StrContains(services["db"].Tags, "primary") || services["db"].Port != 5000 {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }
if len(services["web"].Tags) != 0 || services["web"].Port != 80 { if len(services["web"].Tags) != 0 || services["web"].Port != 80 {

View File

@ -12,14 +12,51 @@ import (
"time" "time"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
const ( const (
// clientRPCCache controls how long we keep an idle connection // clientRPCMinReuseDuration controls the minimum amount of time RPC
// open to a server // queries are sent over an established connection to a single server
clientRPCCache = 30 * time.Second clientRPCMinReuseDuration = 120 * time.Second
// clientRPCJitterFraction determines the amount of jitter added to
// clientRPCMinReuseDuration before a connection is expired and a new
// connection is established in order to rebalance load across consul
// servers. The cluster-wide number of connections per second from
// rebalancing is applied after this jitter to ensure the CPU impact
// is always finite. See newRebalanceConnsPerSecPerServer's comment
// for additional commentary.
//
// For example, in a 10K consul cluster with 5x servers, this default
// averages out to ~13 new connections from rebalancing per server
// per second (each connection is reused for 120s to 180s).
clientRPCJitterFraction = 2
// Limit the number of new connections a server receives per second
// for connection rebalancing. This limit caps the load caused by
// continual rebalancing efforts when a cluster is in equilibrium. A
// lower value comes at the cost of increased recovery time after a
// partition. This parameter begins to take effect when there are
// more than ~48K clients querying 5x servers or at lower server
// values when there is a partition.
//
// For example, in a 100K consul cluster with 5x servers, it will
// take ~5min for all servers to rebalance their connections. If
// 99,995 agents are in the minority talking to only one server, it
// will take ~26min for all servers to rebalance. A 10K cluster in
// the same scenario will take ~2.6min to rebalance.
newRebalanceConnsPerSecPerServer = 64
// clientRPCConnMaxIdle controls how long we keep an idle connection
// open to a server. 127s was chosen as the first prime above 120s
// (arbitrarily chose to use a prime) with the intent of reusing
// connections who are used by once-a-minute cron(8) jobs *and* who
// use a 60s jitter window (e.g. in vixie cron job execution can
// drift by up to 59s per job, or 119s for a once-a-minute cron job).
clientRPCConnMaxIdle = 127 * time.Second
// clientMaxStreams controls how many idle streams we keep // clientMaxStreams controls how many idle streams we keep
// open to a server // open to a server
@ -56,6 +93,10 @@ type Client struct {
lastServer *serverParts lastServer *serverParts
lastRPCTime time.Time lastRPCTime time.Time
// connRebalanceTime is the time at which we should change the server
// we query for RPC requests.
connRebalanceTime time.Time
// Logger uses the provided LogOutput // Logger uses the provided LogOutput
logger *log.Logger logger *log.Logger
@ -103,7 +144,7 @@ func NewClient(config *Config) (*Client, error) {
// Create server // Create server
c := &Client{ c := &Client{
config: config, config: config,
connPool: NewPool(config.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap), connPool: NewPool(config.LogOutput, clientRPCConnMaxIdle, clientMaxStreams, tlsWrap),
eventCh: make(chan serf.Event, 256), eventCh: make(chan serf.Event, 256),
logger: logger, logger: logger,
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
@ -328,37 +369,64 @@ func (c *Client) localEvent(event serf.UserEvent) {
// RPC is used to forward an RPC call to a consul server, or fail if no servers // RPC is used to forward an RPC call to a consul server, or fail if no servers
func (c *Client) RPC(method string, args interface{}, reply interface{}) error { func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// Check the last rpc time // Check to make sure we haven't spent too much time querying a
var server *serverParts // single server
if time.Now().Sub(c.lastRPCTime) < clientRPCCache { now := time.Now()
server = c.lastServer if !c.connRebalanceTime.IsZero() && now.After(c.connRebalanceTime) {
if server != nil { c.logger.Printf("[DEBUG] consul: connection time to server %s exceeded, rotating server connection", c.lastServer.Addr)
goto TRY_RPC c.lastServer = nil
} }
// Allocate these vars on the stack before the goto
var numConsulServers int
var clusterWideRebalanceConnsPerSec float64
var connReuseLowWaterMark time.Duration
var numLANMembers int
// Check the last RPC time, continue to reuse cached connection for
// up to clientRPCMinReuseDuration unless exceeded
// clientRPCConnMaxIdle
lastRPCTime := now.Sub(c.lastRPCTime)
var server *serverParts
if c.lastServer != nil && lastRPCTime < clientRPCConnMaxIdle {
server = c.lastServer
goto TRY_RPC
} }
// Bail if we can't find any servers // Bail if we can't find any servers
c.consulLock.RLock() c.consulLock.RLock()
if len(c.consuls) == 0 { numConsulServers = len(c.consuls)
if numConsulServers == 0 {
c.consulLock.RUnlock() c.consulLock.RUnlock()
return structs.ErrNoServers return structs.ErrNoServers
} }
// Select a random addr // Select a random addr
server = c.consuls[rand.Int31()%int32(len(c.consuls))] server = c.consuls[rand.Int31n(int32(numConsulServers))]
c.consulLock.RUnlock() c.consulLock.RUnlock()
// Limit this connection's life based on the size (and health) of the
// cluster. Never rebalance a connection more frequently than
// connReuseLowWaterMark, and make sure we never exceed
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
clusterWideRebalanceConnsPerSec = float64(numConsulServers * newRebalanceConnsPerSecPerServer)
connReuseLowWaterMark = clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
numLANMembers = len(c.LANMembers())
c.connRebalanceTime = now.Add(lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWaterMark, numLANMembers))
c.logger.Printf("[DEBUG] consul: connection to server %s will expire at %v", server.Addr, c.connRebalanceTime)
// Forward to remote Consul // Forward to remote Consul
TRY_RPC: TRY_RPC:
if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil { if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil {
c.lastServer = nil c.connRebalanceTime = time.Time{}
c.lastRPCTime = time.Time{} c.lastRPCTime = time.Time{}
c.lastServer = nil
return err return err
} }
// Cache the last server // Cache the last server
c.lastServer = server c.lastServer = server
c.lastRPCTime = time.Now() c.lastRPCTime = now
return nil return nil
} }

View File

@ -2,12 +2,15 @@ package consul
import ( import (
"bytes" "bytes"
"fmt"
"os" "os"
"reflect" "reflect"
"testing" "testing"
"github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
) )
@ -38,6 +41,14 @@ func makeLog(buf []byte) *raft.Log {
} }
} }
func generateUUID() (ret string) {
var err error
if ret, err = uuid.GenerateUUID(); err != nil {
panic(fmt.Sprintf("Unable to generate a UUID, %v", err))
}
return ret
}
func TestFSM_RegisterNode(t *testing.T) { func TestFSM_RegisterNode(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr) fsm, err := NewFSM(nil, os.Stderr)
if err != nil { if err != nil {
@ -452,7 +463,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
if len(fooSrv.Services) != 2 { if len(fooSrv.Services) != 2 {
t.Fatalf("Bad: %v", fooSrv) t.Fatalf("Bad: %v", fooSrv)
} }
if !strContains(fooSrv.Services["db"].Tags, "primary") { if !lib.StrContains(fooSrv.Services["db"].Tags, "primary") {
t.Fatalf("Bad: %v", fooSrv) t.Fatalf("Bad: %v", fooSrv)
} }
if fooSrv.Services["db"].Port != 5000 { if fooSrv.Services["db"].Port != 5000 {

View File

@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/net-rpc-msgpackrpc"
) )
@ -377,10 +378,10 @@ func TestHealth_ServiceNodes(t *testing.T) {
if nodes[1].Node.Node != "foo" { if nodes[1].Node.Node != "foo" {
t.Fatalf("Bad: %v", nodes[1]) t.Fatalf("Bad: %v", nodes[1])
} }
if !strContains(nodes[0].Service.Tags, "slave") { if !lib.StrContains(nodes[0].Service.Tags, "slave") {
t.Fatalf("Bad: %v", nodes[0]) t.Fatalf("Bad: %v", nodes[0])
} }
if !strContains(nodes[1].Service.Tags, "master") { if !lib.StrContains(nodes[1].Service.Tags, "master") {
t.Fatalf("Bad: %v", nodes[1]) t.Fatalf("Bad: %v", nodes[1])
} }
if nodes[0].Checks[0].Status != structs.HealthWarning { if nodes[0].Checks[0].Status != structs.HealthWarning {

View File

@ -7,6 +7,7 @@ import (
"testing" "testing"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/net-rpc-msgpackrpc"
) )
@ -56,7 +57,7 @@ func TestInternal_NodeInfo(t *testing.T) {
if nodes[0].Node != "foo" { if nodes[0].Node != "foo" {
t.Fatalf("Bad: %v", nodes[0]) t.Fatalf("Bad: %v", nodes[0])
} }
if !strContains(nodes[0].Services[0].Tags, "master") { if !lib.StrContains(nodes[0].Services[0].Tags, "master") {
t.Fatalf("Bad: %v", nodes[0]) t.Fatalf("Bad: %v", nodes[0])
} }
if nodes[0].Checks[0].Status != structs.HealthPassing { if nodes[0].Checks[0].Status != structs.HealthPassing {
@ -130,7 +131,7 @@ func TestInternal_NodeDump(t *testing.T) {
switch node.Node { switch node.Node {
case "foo": case "foo":
foundFoo = true foundFoo = true
if !strContains(node.Services[0].Tags, "master") { if !lib.StrContains(node.Services[0].Tags, "master") {
t.Fatalf("Bad: %v", nodes[0]) t.Fatalf("Bad: %v", nodes[0])
} }
if node.Checks[0].Status != structs.HealthPassing { if node.Checks[0].Status != structs.HealthPassing {
@ -139,7 +140,7 @@ func TestInternal_NodeDump(t *testing.T) {
case "bar": case "bar":
foundBar = true foundBar = true
if !strContains(node.Services[0].Tags, "slave") { if !lib.StrContains(node.Services[0].Tags, "slave") {
t.Fatalf("Bad: %v", nodes[1]) t.Fatalf("Bad: %v", nodes[1])
} }
if node.Checks[0].Status != structs.HealthWarning { if node.Checks[0].Status != structs.HealthWarning {

View File

@ -9,6 +9,7 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-uuid"
) )
var ( var (
@ -41,7 +42,9 @@ func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string)
// to collide since this isn't inside a write transaction. // to collide since this isn't inside a write transaction.
state := p.srv.fsm.State() state := p.srv.fsm.State()
for { for {
args.Query.ID = generateUUID() if args.Query.ID, err = uuid.GenerateUUID(); err != nil {
return fmt.Errorf("UUID generation for prepared query failed: %v", err)
}
_, query, err := state.PreparedQueryGet(args.Query.ID) _, query, err := state.PreparedQueryGet(args.Query.ID)
if err != nil { if err != nil {
return fmt.Errorf("Prepared query lookup failed: %v", err) return fmt.Errorf("Prepared query lookup failed: %v", err)

View File

@ -12,6 +12,7 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/memberlist" "github.com/hashicorp/memberlist"
"github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/yamux" "github.com/hashicorp/yamux"
@ -329,7 +330,7 @@ func (s *Server) blockingRPC(queryOpts *structs.QueryOptions, queryMeta *structs
} }
// Apply a small amount of jitter to the request. // Apply a small amount of jitter to the request.
queryOpts.MaxQueryTime += randomStagger(queryOpts.MaxQueryTime / jitterFraction) queryOpts.MaxQueryTime += lib.RandomStagger(queryOpts.MaxQueryTime / jitterFraction)
// Setup a query timeout. // Setup a query timeout.
timeout = time.NewTimer(queryOpts.MaxQueryTime) timeout = time.NewTimer(queryOpts.MaxQueryTime)

View File

@ -6,6 +6,7 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-uuid"
) )
// Session endpoint is used to manipulate sessions for KV // Session endpoint is used to manipulate sessions for KV
@ -61,7 +62,11 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
// Generate a new session ID, verify uniqueness // Generate a new session ID, verify uniqueness
state := s.srv.fsm.State() state := s.srv.fsm.State()
for { for {
args.Session.ID = generateUUID() var err error
if args.Session.ID, err = uuid.GenerateUUID(); err != nil {
s.srv.logger.Printf("[ERR] consul.session: UUID generation failed: %v", err)
return err
}
_, sess, err := state.SessionGet(args.Session.ID) _, sess, err := state.SessionGet(args.Session.ID)
if err != nil { if err != nil {
s.srv.logger.Printf("[ERR] consul.session: Session lookup failed: %v", err) s.srv.logger.Printf("[ERR] consul.session: Session lookup failed: %v", err)

View File

@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/net-rpc-msgpackrpc"
) )
@ -217,7 +218,7 @@ func TestSessionEndpoint_List(t *testing.T) {
} }
for i := 0; i < len(sessions.Sessions); i++ { for i := 0; i < len(sessions.Sessions); i++ {
s := sessions.Sessions[i] s := sessions.Sessions[i]
if !strContains(ids, s.ID) { if !lib.StrContains(ids, s.ID) {
t.Fatalf("bad: %v", s) t.Fatalf("bad: %v", s)
} }
if s.Node != "foo" { if s.Node != "foo" {
@ -318,7 +319,7 @@ func TestSessionEndpoint_Renew(t *testing.T) {
} }
for i := 0; i < len(sessions.Sessions); i++ { for i := 0; i < len(sessions.Sessions); i++ {
s := sessions.Sessions[i] s := sessions.Sessions[i]
if !strContains(ids, s.ID) { if !lib.StrContains(ids, s.ID) {
t.Fatalf("bad: %v", s) t.Fatalf("bad: %v", s)
} }
if s.Node != "foo" { if s.Node != "foo" {
@ -352,7 +353,7 @@ func TestSessionEndpoint_Renew(t *testing.T) {
} }
s := session.Sessions[0] s := session.Sessions[0]
if !strContains(ids, s.ID) { if !lib.StrContains(ids, s.ID) {
t.Fatalf("bad: %v", s) t.Fatalf("bad: %v", s)
} }
if s.Node != "foo" { if s.Node != "foo" {
@ -379,7 +380,7 @@ func TestSessionEndpoint_Renew(t *testing.T) {
for i := 0; i < len(sessionsL1.Sessions); i++ { for i := 0; i < len(sessionsL1.Sessions); i++ {
s := sessionsL1.Sessions[i] s := sessionsL1.Sessions[i]
if !strContains(ids, s.ID) { if !lib.StrContains(ids, s.ID) {
t.Fatalf("bad: %v", s) t.Fatalf("bad: %v", s)
} }
if s.Node != "foo" { if s.Node != "foo" {
@ -411,7 +412,7 @@ func TestSessionEndpoint_Renew(t *testing.T) {
if len(sessionsL2.Sessions) != 0 { if len(sessionsL2.Sessions) != 0 {
for i := 0; i < len(sessionsL2.Sessions); i++ { for i := 0; i < len(sessionsL2.Sessions); i++ {
s := sessionsL2.Sessions[i] s := sessionsL2.Sessions[i]
if !strContains(ids, s.ID) { if !lib.StrContains(ids, s.ID) {
t.Fatalf("bad: %v", s) t.Fatalf("bad: %v", s)
} }
if s.Node != "foo" { if s.Node != "foo" {
@ -476,7 +477,7 @@ func TestSessionEndpoint_NodeSessions(t *testing.T) {
} }
for i := 0; i < len(sessions.Sessions); i++ { for i := 0; i < len(sessions.Sessions); i++ {
s := sessions.Sessions[i] s := sessions.Sessions[i]
if !strContains(ids, s.ID) { if !lib.StrContains(ids, s.ID) {
t.Fatalf("bad: %v", s) t.Fatalf("bad: %v", s)
} }
if s.Node != "foo" { if s.Node != "foo" {

View File

@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/coordinate"
) )
@ -1189,16 +1190,6 @@ func TestStateStore_Services(t *testing.T) {
} }
} }
// strContains checks if a list contains a string
func strContains(l []string, s string) bool {
for _, v := range l {
if v == s {
return true
}
}
return false
}
func TestStateStore_ServiceNodes(t *testing.T) { func TestStateStore_ServiceNodes(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
@ -1249,7 +1240,7 @@ func TestStateStore_ServiceNodes(t *testing.T) {
if nodes[0].ServiceID != "db" { if nodes[0].ServiceID != "db" {
t.Fatalf("bad: %v", nodes) t.Fatalf("bad: %v", nodes)
} }
if !strContains(nodes[0].ServiceTags, "slave") { if !lib.StrContains(nodes[0].ServiceTags, "slave") {
t.Fatalf("bad: %v", nodes) t.Fatalf("bad: %v", nodes)
} }
if nodes[0].ServicePort != 8000 { if nodes[0].ServicePort != 8000 {
@ -1265,7 +1256,7 @@ func TestStateStore_ServiceNodes(t *testing.T) {
if nodes[1].ServiceID != "db2" { if nodes[1].ServiceID != "db2" {
t.Fatalf("bad: %v", nodes) t.Fatalf("bad: %v", nodes)
} }
if !strContains(nodes[1].ServiceTags, "slave") { if !lib.StrContains(nodes[1].ServiceTags, "slave") {
t.Fatalf("bad: %v", nodes) t.Fatalf("bad: %v", nodes)
} }
if nodes[1].ServicePort != 8001 { if nodes[1].ServicePort != 8001 {
@ -1281,7 +1272,7 @@ func TestStateStore_ServiceNodes(t *testing.T) {
if nodes[2].ServiceID != "db" { if nodes[2].ServiceID != "db" {
t.Fatalf("bad: %v", nodes) t.Fatalf("bad: %v", nodes)
} }
if !strContains(nodes[2].ServiceTags, "master") { if !lib.StrContains(nodes[2].ServiceTags, "master") {
t.Fatalf("bad: %v", nodes) t.Fatalf("bad: %v", nodes)
} }
if nodes[2].ServicePort != 8000 { if nodes[2].ServicePort != 8000 {
@ -1328,7 +1319,7 @@ func TestStateStore_ServiceTagNodes(t *testing.T) {
if nodes[0].Address != "127.0.0.1" { if nodes[0].Address != "127.0.0.1" {
t.Fatalf("bad: %v", nodes) t.Fatalf("bad: %v", nodes)
} }
if !strContains(nodes[0].ServiceTags, "master") { if !lib.StrContains(nodes[0].ServiceTags, "master") {
t.Fatalf("bad: %v", nodes) t.Fatalf("bad: %v", nodes)
} }
if nodes[0].ServicePort != 8000 { if nodes[0].ServicePort != 8000 {
@ -1375,7 +1366,7 @@ func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) {
if nodes[0].Address != "127.0.0.1" { if nodes[0].Address != "127.0.0.1" {
t.Fatalf("bad: %v", nodes) t.Fatalf("bad: %v", nodes)
} }
if !strContains(nodes[0].ServiceTags, "master") { if !lib.StrContains(nodes[0].ServiceTags, "master") {
t.Fatalf("bad: %v", nodes) t.Fatalf("bad: %v", nodes)
} }
if nodes[0].ServicePort != 8000 { if nodes[0].ServicePort != 8000 {
@ -1409,7 +1400,7 @@ func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) {
if nodes[0].Address != "127.0.0.1" { if nodes[0].Address != "127.0.0.1" {
t.Fatalf("bad: %v", nodes) t.Fatalf("bad: %v", nodes)
} }
if !strContains(nodes[0].ServiceTags, "dev") { if !lib.StrContains(nodes[0].ServiceTags, "dev") {
t.Fatalf("bad: %v", nodes) t.Fatalf("bad: %v", nodes)
} }
if nodes[0].ServicePort != 8001 { if nodes[0].ServicePort != 8001 {

View File

@ -1,17 +1,13 @@
package consul package consul
import ( import (
crand "crypto/rand"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"math/rand"
"net" "net"
"os" "os"
"path/filepath" "path/filepath"
"runtime" "runtime"
"strconv" "strconv"
"strings"
"time"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
@ -83,24 +79,6 @@ func init() {
privateBlocks[5] = block privateBlocks[5] = block
} }
// strContains checks if a list contains a string
func strContains(l []string, s string) bool {
for _, v := range l {
if v == s {
return true
}
}
return false
}
func ToLowerList(l []string) []string {
var out []string
for _, value := range l {
out = append(out, strings.ToLower(value))
}
return out
}
// ensurePath is used to make sure a path exists // ensurePath is used to make sure a path exists
func ensurePath(path string, dir bool) error { func ensurePath(path string, dir bool) error {
if !dir { if !dir {
@ -309,23 +287,3 @@ func runtimeStats() map[string]string {
"cpu_count": strconv.FormatInt(int64(runtime.NumCPU()), 10), "cpu_count": strconv.FormatInt(int64(runtime.NumCPU()), 10),
} }
} }
// generateUUID is used to generate a random UUID
func generateUUID() string {
buf := make([]byte, 16)
if _, err := crand.Read(buf); err != nil {
panic(fmt.Errorf("failed to read random bytes: %v", err))
}
return fmt.Sprintf("%08x-%04x-%04x-%04x-%12x",
buf[0:4],
buf[4:6],
buf[6:8],
buf[8:10],
buf[10:16])
}
// Returns a random stagger interval between 0 and the duration
func randomStagger(intv time.Duration) time.Duration {
return time.Duration(uint64(rand.Int63()) % uint64(intv))
}

View File

@ -6,30 +6,10 @@ import (
"net" "net"
"regexp" "regexp"
"testing" "testing"
"time"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
func TestStrContains(t *testing.T) {
l := []string{"a", "b", "c"}
if !strContains(l, "b") {
t.Fatalf("should contain")
}
if strContains(l, "d") {
t.Fatalf("should not contain")
}
}
func TestToLowerList(t *testing.T) {
l := []string{"ABC", "Abc", "abc"}
for _, value := range ToLowerList(l) {
if value != "abc" {
t.Fatalf("failed lowercasing")
}
}
}
func TestGetPrivateIP(t *testing.T) { func TestGetPrivateIP(t *testing.T) {
ip, _, err := net.ParseCIDR("10.1.2.3/32") ip, _, err := net.ParseCIDR("10.1.2.3/32")
if err != nil { if err != nil {
@ -295,13 +275,3 @@ func TestGenerateUUID(t *testing.T) {
} }
} }
} }
func TestRandomStagger(t *testing.T) {
intv := time.Minute
for i := 0; i < 10; i++ {
stagger := randomStagger(intv)
if stagger < 0 || stagger >= intv {
t.Fatalf("Bad: %v", stagger)
}
}
}

View File

@ -1,4 +1,4 @@
#!/bin/bash #!/usr/bin/env bash
set -e set -e
ZSH_FUNC_DIR="/usr/share/zsh/site-functions" ZSH_FUNC_DIR="/usr/share/zsh/site-functions"

23
lib/cluster.go Normal file
View File

@ -0,0 +1,23 @@
package lib
import (
"math/rand"
"time"
)
// Returns a random stagger interval between 0 and the duration
func RandomStagger(intv time.Duration) time.Duration {
return time.Duration(uint64(rand.Int63()) % uint64(intv))
}
// RateScaledInterval is used to choose an interval to perform an action in
// order to target an aggregate number of actions per second across the whole
// cluster.
func RateScaledInterval(rate float64, min time.Duration, n int) time.Duration {
interval := time.Duration(float64(time.Second) * float64(n) / rate)
if interval < min {
return min
}
return interval
}

39
lib/cluster_test.go Normal file
View File

@ -0,0 +1,39 @@
package lib
import (
"testing"
"time"
)
func TestRandomStagger(t *testing.T) {
intv := time.Minute
for i := 0; i < 10; i++ {
stagger := RandomStagger(intv)
if stagger < 0 || stagger >= intv {
t.Fatalf("Bad: %v", stagger)
}
}
}
func TestRateScaledInterval(t *testing.T) {
min := 1 * time.Second
rate := 200.0
if v := RateScaledInterval(rate, min, 0); v != min {
t.Fatalf("Bad: %v", v)
}
if v := RateScaledInterval(rate, min, 100); v != min {
t.Fatalf("Bad: %v", v)
}
if v := RateScaledInterval(rate, min, 200); v != 1*time.Second {
t.Fatalf("Bad: %v", v)
}
if v := RateScaledInterval(rate, min, 1000); v != 5*time.Second {
t.Fatalf("Bad: %v", v)
}
if v := RateScaledInterval(rate, min, 5000); v != 25*time.Second {
t.Fatalf("Bad: %v", v)
}
if v := RateScaledInterval(rate, min, 10000); v != 50*time.Second {
t.Fatalf("Bad: %v", v)
}
}

18
lib/rand.go Normal file
View File

@ -0,0 +1,18 @@
package lib
import (
"math/rand"
"sync"
"time"
)
var (
once sync.Once
)
// SeedMathRand provides weak, but guaranteed seeding, which is better than
// running with Go's default seed of 1. A call to SeedMathRand() is expected
// to be called via init(), but never a second time.
func SeedMathRand() {
once.Do(func() { rand.Seed(time.Now().UTC().UnixNano()) })
}

11
lib/string.go Normal file
View File

@ -0,0 +1,11 @@
package lib
// StrContains checks if a list contains a string
func StrContains(l []string, s string) bool {
for _, v := range l {
if v == s {
return true
}
}
return false
}

15
lib/string_test.go Normal file
View File

@ -0,0 +1,15 @@
package lib
import (
"testing"
)
func TestStrContains(t *testing.T) {
l := []string{"a", "b", "c"}
if !StrContains(l, "b") {
t.Fatalf("should contain")
}
if StrContains(l, "d") {
t.Fatalf("should not contain")
}
}

View File

@ -6,8 +6,14 @@ import (
"io/ioutil" "io/ioutil"
"log" "log"
"os" "os"
"github.com/hashicorp/consul/lib"
) )
func init() {
lib.SeedMathRand()
}
func main() { func main() {
os.Exit(realMain()) os.Exit(realMain())
} }

View File

@ -1,4 +1,4 @@
#!/bin/bash #!/usr/bin/env bash
# #
# This script builds the application from source for multiple platforms. # This script builds the application from source for multiple platforms.
set -e set -e

View File

@ -1,4 +1,4 @@
#!/bin/bash #!/usr/bin/env bash
set -e set -e
# Get the version from the command line # Get the version from the command line

View File

@ -1,12 +1,12 @@
#!/bin/bash #!/usr/bin/env bash
grep generateUUID consul/state/state_store.go grep GenerateUUID consul/state/state_store.go
RESULT=$? RESULT=$?
if [ $RESULT -eq 0 ]; then if [ $RESULT -eq 0 ]; then
exit 1 exit 1
fi fi
grep generateUUID consul/fsm.go grep GenerateUUID consul/fsm.go
RESULT=$? RESULT=$?
if [ $RESULT -eq 0 ]; then if [ $RESULT -eq 0 ]; then
exit 1 exit 1

View File

@ -5,10 +5,10 @@ setlocal
if not exist %1\consul\state\state_store.go exit /B 1 if not exist %1\consul\state\state_store.go exit /B 1
if not exist %1\consul\fsm.go exit /B 1 if not exist %1\consul\fsm.go exit /B 1
findstr /R generateUUID %1\consul\state\state_store.go 1>nul findstr /R GenerateUUID %1\consul\state\state_store.go 1>nul
if not %ERRORLEVEL% EQU 1 exit /B 1 if not %ERRORLEVEL% EQU 1 exit /B 1
findstr generateUUID %1\consul\fsm.go 1>nul findstr GenerateUUID %1\consul\fsm.go 1>nul
if not %ERRORLEVEL% EQU 1 exit /B 1 if not %ERRORLEVEL% EQU 1 exit /B 1
exit /B 0 exit /B 0

View File

@ -34,7 +34,7 @@
"bundle check || bundle install --jobs 7", "bundle check || bundle install --jobs 7",
"bundle exec middleman build", "bundle exec middleman build",
"/bin/bash ./scripts/deploy.sh" "/usr/bin/env bash ./scripts/deploy.sh"
] ]
} }
] ]

View File

@ -1,4 +1,4 @@
#!/bin/bash #!/usr/bin/env bash
set -e set -e
PROJECT="consul" PROJECT="consul"

View File

@ -299,7 +299,7 @@ a JSON body will be returned like this:
}, },
"Datacenter": "dc3", "Datacenter": "dc3",
"Failovers": 2 "Failovers": 2
} }]
} }
``` ```

View File

@ -73,13 +73,56 @@ DNS on port 8600.
### Dnsmasq Setup ### Dnsmasq Setup
Dnsmasq is typically configured via files in the `/etc/dnsmasq.d` directory. To configure Consul, create the file `/etc/dnsmasq.d/10-consul` with the following contents: Dnsmasq is typically configured via a `dnsmasq.conf` or a series of files in
the `/etc/dnsmasq.d` directory. In Dnsmasq's configuration file
(e.g. `/etc/dnsmasq.d/10-consul`), add the following:
```text ```text
# Enable forward lookup of the 'consul' domain:
server=/consul/127.0.0.1#8600 server=/consul/127.0.0.1#8600
# Uncomment and modify as appropriate to enable reverse DNS lookups for
# common netblocks found in RFC 1918, 5735, and 6598:
#rev-server=0.0.0.0/8,127.0.0.1#8600
#rev-server=10.0.0.0/8,127.0.0.1#8600
#rev-server=100.64.0.0/10,127.0.0.1#8600
#rev-server=127.0.0.1/8,127.0.0.1#8600
#rev-server=169.254.0.0/16,127.0.0.1#8600
#rev-server=172.16.0.0/12,127.0.0.1#8600
#rev-server=192.168.0.0/16,127.0.0.1#8600
#rev-server=224.0.0.0/4,127.0.0.1#8600
#rev-server=240.0.0.0/4,127.0.0.1#8600
``` ```
Once that configuration is created, restart the dnsmasq service. Once that configuration is created, restart the `dnsmasq` service.
Additional useful settings in `dnsmasq` to consider include (see
[`dnsmasq(8)`](http://www.thekelleys.org.uk/dnsmasq/docs/dnsmasq-man.html)
for additional details):
```
# Accept DNS queries only from hosts whose address is on a local subnet.
#local-service
# Don't poll /etc/resolv.conf for changes.
#no-poll
# Don't read /etc/resolv.conf. Get upstream servers only from the command
# line or the dnsmasq configuration file (see the "server" directive below).
#no-resolv
# Specify IP address(es) of other DNS servers for queries not handled
# directly by consul. There is normally one 'server' entry set for every
# 'nameserver' parameter found in '/etc/resolv.conf'. See dnsmasq(8)'s
# 'server' configuration option for details.
#server=1.2.3.4
#server=208.67.222.222
#server=8.8.8.8
# Set the size of dnsmasq's cache. The default is 150 names. Setting the
# cache size to zero disables caching.
#cache-size=65536
```
### Testing ### Testing
@ -107,7 +150,8 @@ master.redis.service.dc-1.consul. 0 IN A 172.31.3.234
;; MSG SIZE rcvd: 76 ;; MSG SIZE rcvd: 76
``` ```
Then run the same query against your BIND instance and make sure you get a result: Then run the same query against your BIND instance and make sure you get a
valid result:
```text ```text
[root@localhost ~]# dig @localhost -p 53 master.redis.service.dc-1.consul. A [root@localhost ~]# dig @localhost -p 53 master.redis.service.dc-1.consul. A
@ -131,10 +175,40 @@ master.redis.service.dc-1.consul. 0 IN A 172.31.3.234
;; MSG SIZE rcvd: 76 ;; MSG SIZE rcvd: 76
``` ```
If desired, verify reverse DNS using the same methodology:
```text
[root@localhost ~]# dig @127.0.0.1 -p 8600 133.139.16.172.in-addr.arpa. PTR
; <<>> DiG 9.10.3-P3 <<>> @127.0.0.1 -p 8600 133.139.16.172.in-addr.arpa. PTR
; (1 server found)
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 3713
;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 0
;; WARNING: recursion requested but not available
;; QUESTION SECTION:
;133.139.16.172.in-addr.arpa. IN PTR
;; ANSWER SECTION:
133.139.16.172.in-addr.arpa. 0 IN PTR consul1.node.dc1.consul.
;; Query time: 3 msec
;; SERVER: 127.0.0.1#8600(127.0.0.1)
;; WHEN: Sun Jan 31 04:25:39 UTC 2016
;; MSG SIZE rcvd: 109
[root@localhost ~]# dig @127.0.0.1 +short -x 172.16.139.133
consul1.node.dc1.consul.
```
### Troubleshooting ### Troubleshooting
If you don't get an answer from BIND but you do get an answer from Consul, your If you don't get an answer from your DNS server (e.g. BIND, Dnsmasq) but you
best bet is to turn on BIND's query log to see what's happening: do get an answer from Consul, your best bet is to turn on your DNS server's
query log to see what's happening.
For BIND:
```text ```text
[root@localhost ~]# rndc querylog [root@localhost ~]# rndc querylog
@ -152,3 +226,6 @@ This indicates that DNSSEC is not disabled properly.
If you see errors about network connections, verify that there are no firewall If you see errors about network connections, verify that there are no firewall
or routing problems between the servers running BIND and Consul. or routing problems between the servers running BIND and Consul.
For Dnsmasq, see the `log-queries` configuration option and the `USR1`
signal.

View File

@ -105,6 +105,9 @@ description: |-
<li> <li>
<a href="http://xordataexchange.github.io/crypt/">crypt</a> - Store and retrieve encrypted configuration parameters from etcd or Consul <a href="http://xordataexchange.github.io/crypt/">crypt</a> - Store and retrieve encrypted configuration parameters from etcd or Consul
</li> </li>
<li>
<a href="https://github.com/smoketurner/dropwizard-consul">Dropwizard Consul Bundle</a> - Service discovery and configuration integration with the <a href="http://www.dropwizard.io/">Dropwizard</a> framework
</li>
<li> <li>
<a href="https://github.com/progrium/docker-consul">docker-consul</a> - Dockerized Consul Agent <a href="https://github.com/progrium/docker-consul">docker-consul</a> - Dockerized Consul Agent
</li> </li>

View File

@ -110,7 +110,7 @@ Armons-MacBook-Air.node.dc1.consul. 0 IN A 172.20.20.11
``` ```
The `SRV` record says that the web service is running on port 80 and exists on The `SRV` record says that the web service is running on port 80 and exists on
the node `agent-one.node.dc1.consul.`. An additional section is returned by the the node `Armons-MacBook-Air.node.dc1.consul.`. An additional section is returned by the
DNS with the `A` record for that node. DNS with the `A` record for that node.
Finally, we can also use the DNS API to filter services by tags. The Finally, we can also use the DNS API to filter services by tags. The

View File

@ -8,7 +8,7 @@ description: |-
# Consul & the HashiCorp Ecosystem # Consul & the HashiCorp Ecosystem
HashiCorp is the creator of the open source projects Vagrant, Packer, Terraform, Serf, and Consul, and the commercial product Atlas. Terraform is just one piece of the ecosystem HashiCorp has built to make application delivery a versioned, auditable, repeatable, and collaborative process. To learn more about our beliefs on the qualities of the modern datacenter and responsible application delivery, read [The Atlas Mindset: Version Control for Infrastructure](https://www.hashicorp.com/blog/atlas-mindset.html?utm_source=consul&utm_campaign=HashicorpEcosystem). HashiCorp is the creator of the open source projects Vagrant, Packer, Terraform, Serf, and Consul, and the commercial product Atlas. Consul is just one piece of the ecosystem HashiCorp has built to make application delivery a versioned, auditable, repeatable, and collaborative process. To learn more about our beliefs on the qualities of the modern datacenter and responsible application delivery, read [The Atlas Mindset: Version Control for Infrastructure](https://www.hashicorp.com/blog/atlas-mindset.html?utm_source=consul&utm_campaign=HashicorpEcosystem).
If you are using Consul for service discovery, its likely that you have a system to deploy infrastructure which Consul is then connecting. Terraform is our tool for creating, combining, and modifying infrastructure. If you are using Consul for service discovery, its likely that you have a system to deploy infrastructure which Consul is then connecting. Terraform is our tool for creating, combining, and modifying infrastructure.