consul: Generate a raft operation to reap tombstones
This commit is contained in:
parent
02e984e4c4
commit
8681d913ba
|
@ -85,6 +85,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
|
|||
return c.applySessionOperation(buf[1:], log.Index)
|
||||
case structs.ACLRequestType:
|
||||
return c.applyACLOperation(buf[1:], log.Index)
|
||||
case structs.TombstoneReapRequestType:
|
||||
return c.applyTombstoneReapOperation(buf[1:], log.Index)
|
||||
default:
|
||||
panic(fmt.Errorf("failed to apply request: %#v", buf))
|
||||
}
|
||||
|
@ -215,6 +217,14 @@ func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *consulFSM) applyTombstoneReapOperation(buf []byte, index uint64) interface{} {
|
||||
var req structs.TombstoneReapRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
|
||||
defer func(start time.Time) {
|
||||
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
|
||||
|
|
|
@ -112,6 +112,8 @@ WAIT:
|
|||
goto RECONCILE
|
||||
case member := <-reconcileCh:
|
||||
s.reconcileMember(member)
|
||||
case index := <-s.tombstoneGC.ExpireCh():
|
||||
go s.reapTombstones(index)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -526,3 +528,22 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// reapTombstones is invoked by the current leader to manage garbage
|
||||
// collection of tombstones. When a key is deleted, we trigger a tombstone
|
||||
// GC clock. Once the expiration is reached, this routine is invoked
|
||||
// to clear all tombstones before this index. This must be replicated
|
||||
// through Raft to ensure consistency. We do this outside the leader loop
|
||||
// to avoid blocking.
|
||||
func (s *Server) reapTombstones(index uint64) {
|
||||
req := structs.TombstoneReapRequest{
|
||||
Datacenter: s.config.Datacenter,
|
||||
ReapIndex: index,
|
||||
WriteRequest: structs.WriteRequest{Token: s.config.ACLToken},
|
||||
}
|
||||
_, err := s.raftApply(structs.TombstoneReapRequestType, &req)
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] consul: failed to reap tombstones up to %d: %v",
|
||||
index, err)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue