diff --git a/api/lock.go b/api/lock.go index a76685f04..c1f6edf82 100644 --- a/api/lock.go +++ b/api/lock.go @@ -2,6 +2,7 @@ package api import ( "fmt" + "strings" "sync" "time" ) @@ -22,9 +23,15 @@ const ( // DefaultLockRetryTime is how long we wait after a failed lock acquisition // before attempting to do the lock again. This is so that once a lock-delay - // is in affect, we do not hot loop retrying the acquisition. + // is in effect, we do not hot loop retrying the acquisition. DefaultLockRetryTime = 5 * time.Second + // DefaultMonitorRetryTime is how long we wait after a failed monitor check + // of a lock (500 response code). This allows the monitor to ride out brief + // periods of unavailability, subject to the MonitorRetries setting in the + // lock options which is by default set to 0, disabling this feature. + DefaultMonitorRetryTime = 2 * time.Second + // LockFlagValue is a magic flag we set to indicate a key // is being used for a lock. It is used to detect a potential // conflict with a semaphore. @@ -62,11 +69,13 @@ type Lock struct { // LockOptions is used to parameterize the Lock behavior. type LockOptions struct { - Key string // Must be set and have write permissions - Value []byte // Optional, value to associate with the lock - Session string // Optional, created if not specified - SessionName string // Optional, defaults to DefaultLockSessionName - SessionTTL string // Optional, defaults to DefaultLockSessionTTL + Key string // Must be set and have write permissions + Value []byte // Optional, value to associate with the lock + Session string // Optional, created if not specified + SessionName string // Optional, defaults to DefaultLockSessionName + SessionTTL string // Optional, defaults to DefaultLockSessionTTL + MonitorRetries int // Optional, defaults to 0 which means no retries + MonitorRetryTime time.Duration // Optional, defaults to DefaultMonitorRetryTime } // LockKey returns a handle to a lock struct which can be used @@ -96,6 +105,9 @@ func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) { return nil, fmt.Errorf("invalid SessionTTL: %v", err) } } + if opts.MonitorRetryTime == 0 { + opts.MonitorRetryTime = DefaultMonitorRetryTime + } l := &Lock{ c: c, opts: opts, @@ -327,8 +339,24 @@ func (l *Lock) monitorLock(session string, stopCh chan struct{}) { kv := l.c.KV() opts := &QueryOptions{RequireConsistent: true} WAIT: + retries := l.opts.MonitorRetries +RETRY: pair, meta, err := kv.Get(l.opts.Key, opts) if err != nil { + // TODO (slackpad) - Make a real error type here instead of using + // a string check. + const serverError = "Unexpected response code: 500" + + // If configured we can try to ride out a brief Consul unavailability + // by doing retries. Note that we have to attempt the retry in a non- + // blocking fashion so that we have a clean place to reset the retry + // counter if service is restored. + if retries > 0 && strings.Contains(err.Error(), serverError) { + time.Sleep(l.opts.MonitorRetryTime) + retries-- + opts.WaitIndex = 0 + goto RETRY + } return } if pair != nil && pair.Session == session { diff --git a/api/lock_test.go b/api/lock_test.go index f4bad9e6b..7d8cbfec3 100644 --- a/api/lock_test.go +++ b/api/lock_test.go @@ -2,6 +2,10 @@ package api import ( "log" + "net/http" + "net/http/httptest" + "net/http/httputil" + "strings" "sync" "testing" "time" @@ -369,3 +373,118 @@ func TestLock_ReclaimLock(t *testing.T) { t.Fatalf("should not be leader") } } + +func TestLock_MonitorRetry(t *testing.T) { + t.Parallel() + raw, s := makeClient(t) + defer s.Stop() + + // Set up a server that always responds with 500 errors. + failer := func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(500) + } + outage := httptest.NewServer(http.HandlerFunc(failer)) + defer outage.Close() + + // Set up a reverse proxy that will send some requests to the + // 500 server and pass everything else through to the real Consul + // server. + var mutex sync.Mutex + errors := 0 + director := func(req *http.Request) { + mutex.Lock() + defer mutex.Unlock() + + req.URL.Scheme = "http" + if errors > 0 && req.Method == "GET" && strings.Contains(req.URL.Path, "/v1/kv/test/lock") { + req.URL.Host = outage.URL[7:] // Strip off "http://". + errors-- + } else { + req.URL.Host = raw.config.Address + } + } + proxy := httptest.NewServer(&httputil.ReverseProxy{Director: director}) + defer proxy.Close() + + // Make another client that points at the proxy instead of the real + // Consul server. + config := raw.config + config.Address = proxy.URL[7:] // Strip off "http://". + c, err := NewClient(&config) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Set up a lock with retries enabled. + opts := &LockOptions{ + Key: "test/lock", + SessionTTL: "60s", + MonitorRetries: 3, + } + lock, err := c.LockOpts(opts) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Make sure the default got set. + if lock.opts.MonitorRetryTime != DefaultMonitorRetryTime { + t.Fatalf("bad: %d", lock.opts.MonitorRetryTime) + } + + // Now set a custom time for the test. + opts.MonitorRetryTime = 250 * time.Millisecond + lock, err = c.LockOpts(opts) + if err != nil { + t.Fatalf("err: %v", err) + } + if lock.opts.MonitorRetryTime != 250*time.Millisecond { + t.Fatalf("bad: %d", lock.opts.MonitorRetryTime) + } + + // Should get the lock. + leaderCh, err := lock.Lock(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if leaderCh == nil { + t.Fatalf("not leader") + } + + // Poke the key using the raw client to force the monitor to wake up + // and check the lock again. This time we will return errors for some + // of the responses. + mutex.Lock() + errors = 2 + mutex.Unlock() + pair, _, err := raw.KV().Get("test/lock", &QueryOptions{}) + if err != nil { + t.Fatalf("err: %v", err) + } + if _, err := raw.KV().Put(pair, &WriteOptions{}); err != nil { + t.Fatalf("err: %v", err) + } + time.Sleep(5 * opts.MonitorRetryTime) + + // Should still be the leader. + select { + case <-leaderCh: + t.Fatalf("should be leader") + default: + } + + // Now return an overwhelming number of errors. + mutex.Lock() + errors = 10 + mutex.Unlock() + if _, err := raw.KV().Put(pair, &WriteOptions{}); err != nil { + t.Fatalf("err: %v", err) + } + time.Sleep(5 * opts.MonitorRetryTime) + + // Should lose leadership. + select { + case <-leaderCh: + case <-time.After(time.Second): + t.Fatalf("should not be leader") + } +}