package api import ( "fmt" "sync" "time" ) const ( // DefaultLockSessionName is the Session Name we assign if none is provided DefaultLockSessionName = "Consul API Lock" // DefaultLockSessionTTL is the default session TTL if no Session is provided // when creating a new Lock. This is used because we do not have another // other check to depend upon. DefaultLockSessionTTL = "15s" // DefaultLockWaitTime is how long we block for at a time to check if lock // acquisition is possible. This affects the minimum time it takes to cancel // a Lock acquisition. DefaultLockWaitTime = 15 * time.Second // DefaultLockRetryTime is how long we wait after a failed lock acquisition // before attempting to do the lock again. This is so that once a lock-delay // is in affect, we do not hot loop retrying the acquisition. DefaultLockRetryTime = 5 * time.Second ) var ( // ErrLockHeld is returned if we attempt to double lock ErrLockHeld = fmt.Errorf("Lock already held") // ErrLockNotHeld is returned if we attempt to unlock a lock // that we do not hold. ErrLockNotHeld = fmt.Errorf("Lock not held") ) // Lock is used to implement client-side leader election. It is follows the // algorithm as described here: https://consul.io/docs/guides/leader-election.html. type Lock struct { c *Client opts *LockOptions isHeld bool sessionRenew chan struct{} lockSession string l sync.Mutex } // LockOptions is used to parameterize the Lock behavior. type LockOptions struct { Key string // Must be set and have write permissions Value []byte // Optional, value to associate with the lock Session string // Optional, created if not specified SessionName string // Optional, defaults to DefaultLockSessionName SessionTTL string // Optional, defaults to DefaultLockSessionTTL } // LockKey returns a handle to a lock struct which can be used // to acquire and release the mutex. The key used must have // write permissions. func (c *Client) LockKey(key string) (*Lock, error) { opts := &LockOptions{ Key: key, } return c.LockOpts(opts) } // LockOpts returns a handle to a lock struct which can be used // to acquire and release the mutex. The key used must have // write permissions. func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) { if opts.Key == "" { return nil, fmt.Errorf("missing key") } if opts.SessionName == "" { opts.SessionName = DefaultLockSessionName } if opts.SessionTTL == "" { opts.SessionTTL = DefaultLockSessionTTL } else { if _, err := time.ParseDuration(opts.SessionTTL); err != nil { return nil, fmt.Errorf("invalid SessionTTL: %v", err) } } l := &Lock{ c: c, opts: opts, } return l, nil } // Lock attempts to acquire the lock and blocks while doing so. // Providing a non-nil stopCh can be used to abort the lock attempt. // Returns a channel that is closed if our lock is lost or an error. // This channel could be closed at any time due to session invalidation, // communication errors, operator intervention, etc. It is NOT safe to // assume that the lock is held until Unlock() unless the Session is specifically // created without any associated health checks. By default Consul sessions // prefer liveness over safety and an application must be able to handle // the lock being lost. func (l *Lock) Lock(stopCh chan struct{}) (chan struct{}, error) { // Hold the lock as we try to acquire l.l.Lock() defer l.l.Unlock() // Check if we already hold the lock if l.isHeld { return nil, ErrLockHeld } // Check if we need to create a session first l.lockSession = l.opts.Session if l.lockSession == "" { if s, err := l.createSession(); err != nil { return nil, fmt.Errorf("failed to create session: %v", err) } else { l.sessionRenew = make(chan struct{}) l.lockSession = s go l.renewSession(s, l.sessionRenew) // If we fail to acquire the lock, cleanup the session defer func() { if !l.isHeld { close(l.sessionRenew) l.sessionRenew = nil } }() } } // Setup the query options kv := l.c.KV() qOpts := &QueryOptions{ WaitTime: DefaultLockWaitTime, } WAIT: // Check if we should quit select { case <-stopCh: return nil, nil default: } // Look for an existing lock, blocking until not taken pair, meta, err := kv.Get(l.opts.Key, qOpts) if err != nil { return nil, fmt.Errorf("failed to read lock: %v", err) } if pair != nil && pair.Session != "" { qOpts.WaitIndex = meta.LastIndex goto WAIT } // Try to acquire the lock lockEnt := l.lockEntry(l.lockSession) locked, _, err := kv.Acquire(lockEnt, nil) if err != nil { return nil, fmt.Errorf("failed to acquire lock: %v", err) } // Handle the case of not getting the lock if !locked { select { case <-time.After(DefaultLockRetryTime): goto WAIT case <-stopCh: return nil, nil } } // Watch to ensure we maintain leadership leaderCh := make(chan struct{}) go l.monitorLock(l.lockSession, leaderCh) // Set that we own the lock l.isHeld = true // Locked! All done return leaderCh, nil } // Unlock released the lock. It is an error to call this // if the lock is not currently held. func (l *Lock) Unlock() error { // Hold the lock as we try to release l.l.Lock() defer l.l.Unlock() // Ensure the lock is actually held if !l.isHeld { return ErrLockNotHeld } // Set that we no longer own the lock l.isHeld = false // Stop the session renew if l.sessionRenew != nil { defer func() { close(l.sessionRenew) l.sessionRenew = nil }() } // Get the lock entry, and clear the lock session lockEnt := l.lockEntry(l.lockSession) l.lockSession = "" // Release the lock explicitly kv := l.c.KV() _, _, err := kv.Release(lockEnt, nil) if err != nil { return fmt.Errorf("failed to release lock: %v", err) } return nil } // createSession is used to create a new managed session func (l *Lock) createSession() (string, error) { session := l.c.Session() se := &SessionEntry{ Name: l.opts.SessionName, TTL: l.opts.SessionTTL, } id, _, err := session.Create(se, nil) if err != nil { return "", err } return id, nil } // lockEntry returns a formatted KVPair for the lock func (l *Lock) lockEntry(session string) *KVPair { return &KVPair{ Key: l.opts.Key, Value: l.opts.Value, Session: session, } } // renewSession is a long running routine that maintians a session // by doing a periodic Session renewal. func (l *Lock) renewSession(id string, doneCh chan struct{}) { session := l.c.Session() ttl, _ := time.ParseDuration(l.opts.SessionTTL) for { select { case <-time.After(ttl / 2): entry, _, err := session.Renew(id, nil) if err != nil || entry == nil { return } // Handle the server updating the TTL ttl, _ = time.ParseDuration(entry.TTL) case <-doneCh: // Attempt a session destroy session.Destroy(id, nil) return } } } // monitorLock is a long running routine to monitor a lock ownership // It closes the stopCh if we lose our leadership. func (l *Lock) monitorLock(session string, stopCh chan struct{}) { defer close(stopCh) kv := l.c.KV() opts := &QueryOptions{RequireConsistent: true} WAIT: pair, meta, err := kv.Get(l.opts.Key, opts) if err != nil { return } if pair != nil && pair.Session == session { opts.WaitIndex = meta.LastIndex goto WAIT } }