Merge pull request #1457 from hashicorp/f-monitor-retries
Adds a retry capability to lock monitors in the API client.
This commit is contained in:
commit
b7807b42c1
40
api/lock.go
40
api/lock.go
|
@ -2,6 +2,7 @@ package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -22,9 +23,15 @@ const (
|
||||||
|
|
||||||
// DefaultLockRetryTime is how long we wait after a failed lock acquisition
|
// DefaultLockRetryTime is how long we wait after a failed lock acquisition
|
||||||
// before attempting to do the lock again. This is so that once a lock-delay
|
// before attempting to do the lock again. This is so that once a lock-delay
|
||||||
// is in affect, we do not hot loop retrying the acquisition.
|
// is in effect, we do not hot loop retrying the acquisition.
|
||||||
DefaultLockRetryTime = 5 * time.Second
|
DefaultLockRetryTime = 5 * time.Second
|
||||||
|
|
||||||
|
// DefaultMonitorRetryTime is how long we wait after a failed monitor check
|
||||||
|
// of a lock (500 response code). This allows the monitor to ride out brief
|
||||||
|
// periods of unavailability, subject to the MonitorRetries setting in the
|
||||||
|
// lock options which is by default set to 0, disabling this feature.
|
||||||
|
DefaultMonitorRetryTime = 2 * time.Second
|
||||||
|
|
||||||
// LockFlagValue is a magic flag we set to indicate a key
|
// LockFlagValue is a magic flag we set to indicate a key
|
||||||
// is being used for a lock. It is used to detect a potential
|
// is being used for a lock. It is used to detect a potential
|
||||||
// conflict with a semaphore.
|
// conflict with a semaphore.
|
||||||
|
@ -62,11 +69,13 @@ type Lock struct {
|
||||||
|
|
||||||
// LockOptions is used to parameterize the Lock behavior.
|
// LockOptions is used to parameterize the Lock behavior.
|
||||||
type LockOptions struct {
|
type LockOptions struct {
|
||||||
Key string // Must be set and have write permissions
|
Key string // Must be set and have write permissions
|
||||||
Value []byte // Optional, value to associate with the lock
|
Value []byte // Optional, value to associate with the lock
|
||||||
Session string // Optional, created if not specified
|
Session string // Optional, created if not specified
|
||||||
SessionName string // Optional, defaults to DefaultLockSessionName
|
SessionName string // Optional, defaults to DefaultLockSessionName
|
||||||
SessionTTL string // Optional, defaults to DefaultLockSessionTTL
|
SessionTTL string // Optional, defaults to DefaultLockSessionTTL
|
||||||
|
MonitorRetries int // Optional, defaults to 0 which means no retries
|
||||||
|
MonitorRetryTime time.Duration // Optional, defaults to DefaultMonitorRetryTime
|
||||||
}
|
}
|
||||||
|
|
||||||
// LockKey returns a handle to a lock struct which can be used
|
// LockKey returns a handle to a lock struct which can be used
|
||||||
|
@ -96,6 +105,9 @@ func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) {
|
||||||
return nil, fmt.Errorf("invalid SessionTTL: %v", err)
|
return nil, fmt.Errorf("invalid SessionTTL: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if opts.MonitorRetryTime == 0 {
|
||||||
|
opts.MonitorRetryTime = DefaultMonitorRetryTime
|
||||||
|
}
|
||||||
l := &Lock{
|
l := &Lock{
|
||||||
c: c,
|
c: c,
|
||||||
opts: opts,
|
opts: opts,
|
||||||
|
@ -327,8 +339,24 @@ func (l *Lock) monitorLock(session string, stopCh chan struct{}) {
|
||||||
kv := l.c.KV()
|
kv := l.c.KV()
|
||||||
opts := &QueryOptions{RequireConsistent: true}
|
opts := &QueryOptions{RequireConsistent: true}
|
||||||
WAIT:
|
WAIT:
|
||||||
|
retries := l.opts.MonitorRetries
|
||||||
|
RETRY:
|
||||||
pair, meta, err := kv.Get(l.opts.Key, opts)
|
pair, meta, err := kv.Get(l.opts.Key, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// TODO (slackpad) - Make a real error type here instead of using
|
||||||
|
// a string check.
|
||||||
|
const serverError = "Unexpected response code: 500"
|
||||||
|
|
||||||
|
// If configured we can try to ride out a brief Consul unavailability
|
||||||
|
// by doing retries. Note that we have to attempt the retry in a non-
|
||||||
|
// blocking fashion so that we have a clean place to reset the retry
|
||||||
|
// counter if service is restored.
|
||||||
|
if retries > 0 && strings.Contains(err.Error(), serverError) {
|
||||||
|
time.Sleep(l.opts.MonitorRetryTime)
|
||||||
|
retries--
|
||||||
|
opts.WaitIndex = 0
|
||||||
|
goto RETRY
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if pair != nil && pair.Session == session {
|
if pair != nil && pair.Session == session {
|
||||||
|
|
119
api/lock_test.go
119
api/lock_test.go
|
@ -2,6 +2,10 @@ package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"net/http/httputil"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -369,3 +373,118 @@ func TestLock_ReclaimLock(t *testing.T) {
|
||||||
t.Fatalf("should not be leader")
|
t.Fatalf("should not be leader")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLock_MonitorRetry(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
raw, s := makeClient(t)
|
||||||
|
defer s.Stop()
|
||||||
|
|
||||||
|
// Set up a server that always responds with 500 errors.
|
||||||
|
failer := func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
w.WriteHeader(500)
|
||||||
|
}
|
||||||
|
outage := httptest.NewServer(http.HandlerFunc(failer))
|
||||||
|
defer outage.Close()
|
||||||
|
|
||||||
|
// Set up a reverse proxy that will send some requests to the
|
||||||
|
// 500 server and pass everything else through to the real Consul
|
||||||
|
// server.
|
||||||
|
var mutex sync.Mutex
|
||||||
|
errors := 0
|
||||||
|
director := func(req *http.Request) {
|
||||||
|
mutex.Lock()
|
||||||
|
defer mutex.Unlock()
|
||||||
|
|
||||||
|
req.URL.Scheme = "http"
|
||||||
|
if errors > 0 && req.Method == "GET" && strings.Contains(req.URL.Path, "/v1/kv/test/lock") {
|
||||||
|
req.URL.Host = outage.URL[7:] // Strip off "http://".
|
||||||
|
errors--
|
||||||
|
} else {
|
||||||
|
req.URL.Host = raw.config.Address
|
||||||
|
}
|
||||||
|
}
|
||||||
|
proxy := httptest.NewServer(&httputil.ReverseProxy{Director: director})
|
||||||
|
defer proxy.Close()
|
||||||
|
|
||||||
|
// Make another client that points at the proxy instead of the real
|
||||||
|
// Consul server.
|
||||||
|
config := raw.config
|
||||||
|
config.Address = proxy.URL[7:] // Strip off "http://".
|
||||||
|
c, err := NewClient(&config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set up a lock with retries enabled.
|
||||||
|
opts := &LockOptions{
|
||||||
|
Key: "test/lock",
|
||||||
|
SessionTTL: "60s",
|
||||||
|
MonitorRetries: 3,
|
||||||
|
}
|
||||||
|
lock, err := c.LockOpts(opts)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the default got set.
|
||||||
|
if lock.opts.MonitorRetryTime != DefaultMonitorRetryTime {
|
||||||
|
t.Fatalf("bad: %d", lock.opts.MonitorRetryTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now set a custom time for the test.
|
||||||
|
opts.MonitorRetryTime = 250 * time.Millisecond
|
||||||
|
lock, err = c.LockOpts(opts)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if lock.opts.MonitorRetryTime != 250*time.Millisecond {
|
||||||
|
t.Fatalf("bad: %d", lock.opts.MonitorRetryTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should get the lock.
|
||||||
|
leaderCh, err := lock.Lock(nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if leaderCh == nil {
|
||||||
|
t.Fatalf("not leader")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Poke the key using the raw client to force the monitor to wake up
|
||||||
|
// and check the lock again. This time we will return errors for some
|
||||||
|
// of the responses.
|
||||||
|
mutex.Lock()
|
||||||
|
errors = 2
|
||||||
|
mutex.Unlock()
|
||||||
|
pair, _, err := raw.KV().Get("test/lock", &QueryOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if _, err := raw.KV().Put(pair, &WriteOptions{}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
time.Sleep(5 * opts.MonitorRetryTime)
|
||||||
|
|
||||||
|
// Should still be the leader.
|
||||||
|
select {
|
||||||
|
case <-leaderCh:
|
||||||
|
t.Fatalf("should be leader")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now return an overwhelming number of errors.
|
||||||
|
mutex.Lock()
|
||||||
|
errors = 10
|
||||||
|
mutex.Unlock()
|
||||||
|
if _, err := raw.KV().Put(pair, &WriteOptions{}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
time.Sleep(5 * opts.MonitorRetryTime)
|
||||||
|
|
||||||
|
// Should lose leadership.
|
||||||
|
select {
|
||||||
|
case <-leaderCh:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatalf("should not be leader")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue