consul: Disable tombstones as follower
This commit is contained in:
parent
4f6f5ae6f0
commit
200b348f69
|
@ -50,16 +50,15 @@ func (s *Server) monitorLeadership() {
|
|||
// leaderLoop runs as long as we are the leader to run various
|
||||
// maintence activities
|
||||
func (s *Server) leaderLoop(stopCh chan struct{}) {
|
||||
// Ensure we revoke leadership on stepdown
|
||||
defer s.revokeLeadership()
|
||||
|
||||
// Fire a user event indicating a new leader
|
||||
payload := []byte(s.config.NodeName)
|
||||
if err := s.serfLAN.UserEvent(newLeaderEvent, payload, false); err != nil {
|
||||
s.logger.Printf("[WARN] consul: failed to broadcast new leader event: %v", err)
|
||||
}
|
||||
|
||||
// Clear the session timers on either shutdown or step down, since we
|
||||
// are no longer responsible for session expirations.
|
||||
defer s.clearAllSessionTimers()
|
||||
|
||||
// Reconcile channel is only used once initial reconcile
|
||||
// has succeeded
|
||||
var reconcileCh chan serf.Member
|
||||
|
@ -126,7 +125,7 @@ func (s *Server) establishLeadership() error {
|
|||
// Hint the tombstone expiration timer. When we freshly establish leadership
|
||||
// we become the authoritative timer, and so we need to start the clock
|
||||
// on any pending GC events.
|
||||
s.tombstoneGC.Reset()
|
||||
s.tombstoneGC.SetEnabled(true)
|
||||
lastIndex := s.raft.LastIndex()
|
||||
s.tombstoneGC.Hint(lastIndex)
|
||||
s.logger.Printf("[DEBUG] consul: reset tombstone GC to index %d", lastIndex)
|
||||
|
@ -154,6 +153,21 @@ func (s *Server) establishLeadership() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// revokeLeadership is invoked once we step down as leader.
|
||||
// This is used to cleanup any state that may be specific to a leader.
|
||||
func (s *Server) revokeLeadership() error {
|
||||
// Disable the tombstone GC, since it is only useful as a leader
|
||||
s.tombstoneGC.SetEnabled(false)
|
||||
|
||||
// Clear the session timers on either shutdown or step down, since we
|
||||
// are no longer responsible for session expirations.
|
||||
if err := s.clearAllSessionTimers(); err != nil {
|
||||
s.logger.Printf("[ERR] consul: Clearing session timers failed: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// initializeACL is used to setup the ACLs if we are the leader
|
||||
// and need to do this.
|
||||
func (s *Server) initializeACL() error {
|
||||
|
|
|
@ -23,13 +23,18 @@ type TombstoneGC struct {
|
|||
ttl time.Duration
|
||||
granularity time.Duration
|
||||
|
||||
// enabled controls if we actually setup any timers.
|
||||
enabled bool
|
||||
|
||||
// expires maps the time of expiration to the highest
|
||||
// tombstone value that should be expired.
|
||||
expires map[time.Time]*expireInterval
|
||||
expiresLock sync.Mutex
|
||||
expires map[time.Time]*expireInterval
|
||||
|
||||
// expireCh is used to stream expiration
|
||||
expireCh chan uint64
|
||||
|
||||
// lock is used to ensure safe access to all the fields
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// expireInterval is used to track the maximum index
|
||||
|
@ -53,6 +58,7 @@ func NewTombstoneGC(ttl, granularity time.Duration) (*TombstoneGC, error) {
|
|||
t := &TombstoneGC{
|
||||
ttl: ttl,
|
||||
granularity: granularity,
|
||||
enabled: false,
|
||||
expires: make(map[time.Time]*expireInterval),
|
||||
expireCh: make(chan uint64, 1),
|
||||
}
|
||||
|
@ -65,14 +71,25 @@ func (t *TombstoneGC) ExpireCh() <-chan uint64 {
|
|||
return t.expireCh
|
||||
}
|
||||
|
||||
// Reset is used to clear the TTL timers
|
||||
func (t *TombstoneGC) Reset() {
|
||||
t.expiresLock.Lock()
|
||||
defer t.expiresLock.Unlock()
|
||||
for _, exp := range t.expires {
|
||||
exp.timer.Stop()
|
||||
// SetEnabled is used to control if the tombstone GC is
|
||||
// enabled. Should only be enabled by the leader node.
|
||||
func (t *TombstoneGC) SetEnabled(enabled bool) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
if enabled == t.enabled {
|
||||
return
|
||||
}
|
||||
t.expires = make(map[time.Time]*expireInterval)
|
||||
|
||||
// Stop all the timers and clear
|
||||
if !enabled {
|
||||
for _, exp := range t.expires {
|
||||
exp.timer.Stop()
|
||||
}
|
||||
t.expires = make(map[time.Time]*expireInterval)
|
||||
}
|
||||
|
||||
// Update the status
|
||||
t.enabled = enabled
|
||||
}
|
||||
|
||||
// Hint is used to indicate that keys at the given index have been
|
||||
|
@ -80,8 +97,11 @@ func (t *TombstoneGC) Reset() {
|
|||
func (t *TombstoneGC) Hint(index uint64) {
|
||||
expires := t.nextExpires()
|
||||
|
||||
t.expiresLock.Lock()
|
||||
defer t.expiresLock.Unlock()
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
if !t.enabled {
|
||||
return
|
||||
}
|
||||
|
||||
// Check for an existing expiration timer
|
||||
exp, ok := t.expires[expires]
|
||||
|
@ -104,8 +124,8 @@ func (t *TombstoneGC) Hint(index uint64) {
|
|||
|
||||
// PendingExpiration is used to check if any expirations are pending
|
||||
func (t *TombstoneGC) PendingExpiration() bool {
|
||||
t.expiresLock.Lock()
|
||||
defer t.expiresLock.Unlock()
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
return len(t.expires) > 0
|
||||
}
|
||||
|
||||
|
@ -120,10 +140,10 @@ func (t *TombstoneGC) nextExpires() time.Time {
|
|||
// expireTime is used to expire the entries at the given time
|
||||
func (t *TombstoneGC) expireTime(expires time.Time) {
|
||||
// Get the maximum index and clear the entry
|
||||
t.expiresLock.Lock()
|
||||
t.lock.Lock()
|
||||
exp := t.expires[expires]
|
||||
delete(t.expires, expires)
|
||||
t.expiresLock.Unlock()
|
||||
t.lock.Unlock()
|
||||
|
||||
// Notify the expires channel
|
||||
t.expireCh <- exp.maxIndex
|
||||
|
|
|
@ -29,6 +29,7 @@ func TestTombstoneGC(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("should fail")
|
||||
}
|
||||
gc.SetEnabled(true)
|
||||
|
||||
if gc.PendingExpiration() {
|
||||
t.Fatalf("should not be pending")
|
||||
|
@ -82,13 +83,14 @@ func TestTombstoneGC_Expire(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("should fail")
|
||||
}
|
||||
gc.SetEnabled(true)
|
||||
|
||||
if gc.PendingExpiration() {
|
||||
t.Fatalf("should not be pending")
|
||||
}
|
||||
|
||||
gc.Hint(100)
|
||||
gc.Reset()
|
||||
gc.SetEnabled(false)
|
||||
|
||||
if gc.PendingExpiration() {
|
||||
t.Fatalf("should not be pending")
|
||||
|
|
Loading…
Reference in a new issue