Merge pull request #577 from hashicorp/f-tombstone

Adding KV tombstones to fix non-monotonic index on deletes
This commit is contained in:
Armon Dadgar 2015-01-05 15:29:35 -08:00
commit 607735a5fe
15 changed files with 1293 additions and 102 deletions

View file

@ -158,6 +158,29 @@ type Config struct {
// "allow" can be used to allow all requests. This is not recommended. // "allow" can be used to allow all requests. This is not recommended.
ACLDownPolicy string ACLDownPolicy string
// TombstoneTTL is used to control how long KV tombstones are retained.
// This provides a window of time where the X-Consul-Index is monotonic.
// Outside this window, the index may not be monotonic. This is a result
// of a few trade offs:
// 1) The index is defined by the data view and not globally. This is a
// performance optimization that prevents any write from incrementing the
// index for all data views.
// 2) Tombstones are not kept indefinitely, since otherwise storage required
// is also monotonic. This prevents deletes from reducing the disk space
// used.
// In theory, neither of these are intrinsic limitations, however for the
// purposes of building a practical system, they are reaonable trade offs.
//
// It is also possible to set this to an incredibly long time, thereby
// simulating infinite retention. This is not recommended however.
//
TombstoneTTL time.Duration
// TombstoneTTLGranularity is used to control how granular the timers are
// for the Tombstone GC. This is used to batch the GC of many keys together
// to reduce overhead. It is unlikely a user would ever need to tune this.
TombstoneTTLGranularity time.Duration
// ServerUp callback can be used to trigger a notification that // ServerUp callback can be used to trigger a notification that
// a Consul server is now up and known about. // a Consul server is now up and known about.
ServerUp func() ServerUp func()
@ -205,17 +228,19 @@ func DefaultConfig() *Config {
} }
conf := &Config{ conf := &Config{
Datacenter: DefaultDC, Datacenter: DefaultDC,
NodeName: hostname, NodeName: hostname,
RPCAddr: DefaultRPCAddr, RPCAddr: DefaultRPCAddr,
RaftConfig: raft.DefaultConfig(), RaftConfig: raft.DefaultConfig(),
SerfLANConfig: serf.DefaultConfig(), SerfLANConfig: serf.DefaultConfig(),
SerfWANConfig: serf.DefaultConfig(), SerfWANConfig: serf.DefaultConfig(),
ReconcileInterval: 60 * time.Second, ReconcileInterval: 60 * time.Second,
ProtocolVersion: ProtocolVersionMax, ProtocolVersion: ProtocolVersionMax,
ACLTTL: 30 * time.Second, ACLTTL: 30 * time.Second,
ACLDefaultPolicy: "allow", ACLDefaultPolicy: "allow",
ACLDownPolicy: "extend-cache", ACLDownPolicy: "extend-cache",
TombstoneTTL: 15 * time.Minute,
TombstoneTTLGranularity: 30 * time.Second,
} }
// Increase our reap interval to 3 days instead of 24h. // Increase our reap interval to 3 days instead of 24h.

View file

@ -8,6 +8,7 @@ import (
"log" "log"
"time" "time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
@ -21,6 +22,7 @@ type consulFSM struct {
logger *log.Logger logger *log.Logger
path string path string
state *StateStore state *StateStore
gc *TombstoneGC
} }
// consulSnapshot is used to provide a snapshot of the current // consulSnapshot is used to provide a snapshot of the current
@ -38,7 +40,7 @@ type snapshotHeader struct {
} }
// NewFSMPath is used to construct a new FSM with a blank state // NewFSMPath is used to construct a new FSM with a blank state
func NewFSM(path string, logOutput io.Writer) (*consulFSM, error) { func NewFSM(gc *TombstoneGC, path string, logOutput io.Writer) (*consulFSM, error) {
// Create a temporary path for the state store // Create a temporary path for the state store
tmpPath, err := ioutil.TempDir(path, "state") tmpPath, err := ioutil.TempDir(path, "state")
if err != nil { if err != nil {
@ -46,7 +48,7 @@ func NewFSM(path string, logOutput io.Writer) (*consulFSM, error) {
} }
// Create a state store // Create a state store
state, err := NewStateStorePath(tmpPath, logOutput) state, err := NewStateStorePath(gc, tmpPath, logOutput)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -56,6 +58,7 @@ func NewFSM(path string, logOutput io.Writer) (*consulFSM, error) {
logger: log.New(logOutput, "", log.LstdFlags), logger: log.New(logOutput, "", log.LstdFlags),
path: path, path: path,
state: state, state: state,
gc: gc,
} }
return fsm, nil return fsm, nil
} }
@ -83,6 +86,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
return c.applySessionOperation(buf[1:], log.Index) return c.applySessionOperation(buf[1:], log.Index)
case structs.ACLRequestType: case structs.ACLRequestType:
return c.applyACLOperation(buf[1:], log.Index) return c.applyACLOperation(buf[1:], log.Index)
case structs.TombstoneRequestType:
return c.applyTombstoneOperation(buf[1:], log.Index)
default: default:
panic(fmt.Errorf("failed to apply request: %#v", buf)) panic(fmt.Errorf("failed to apply request: %#v", buf))
} }
@ -97,6 +102,7 @@ func (c *consulFSM) decodeRegister(buf []byte, index uint64) interface{} {
} }
func (c *consulFSM) applyRegister(req *structs.RegisterRequest, index uint64) interface{} { func (c *consulFSM) applyRegister(req *structs.RegisterRequest, index uint64) interface{} {
defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now())
// Apply all updates in a single transaction // Apply all updates in a single transaction
if err := c.state.EnsureRegistration(index, req); err != nil { if err := c.state.EnsureRegistration(index, req); err != nil {
c.logger.Printf("[INFO] consul.fsm: EnsureRegistration failed: %v", err) c.logger.Printf("[INFO] consul.fsm: EnsureRegistration failed: %v", err)
@ -106,6 +112,7 @@ func (c *consulFSM) applyRegister(req *structs.RegisterRequest, index uint64) in
} }
func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} { func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"consul", "fsm", "deregister"}, time.Now())
var req structs.DeregisterRequest var req structs.DeregisterRequest
if err := structs.Decode(buf, &req); err != nil { if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err)) panic(fmt.Errorf("failed to decode request: %v", err))
@ -136,6 +143,7 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
if err := structs.Decode(buf, &req); err != nil { if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err)) panic(fmt.Errorf("failed to decode request: %v", err))
} }
defer metrics.MeasureSince([]string{"consul", "fsm", "kvs", string(req.Op)}, time.Now())
switch req.Op { switch req.Op {
case structs.KVSSet: case structs.KVSSet:
return c.state.KVSSet(index, &req.DirEnt) return c.state.KVSSet(index, &req.DirEnt)
@ -176,6 +184,7 @@ func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{}
if err := structs.Decode(buf, &req); err != nil { if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err)) panic(fmt.Errorf("failed to decode request: %v", err))
} }
defer metrics.MeasureSince([]string{"consul", "fsm", "session", string(req.Op)}, time.Now())
switch req.Op { switch req.Op {
case structs.SessionCreate: case structs.SessionCreate:
if err := c.state.SessionCreate(index, &req.Session); err != nil { if err := c.state.SessionCreate(index, &req.Session); err != nil {
@ -196,6 +205,7 @@ func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} {
if err := structs.Decode(buf, &req); err != nil { if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err)) panic(fmt.Errorf("failed to decode request: %v", err))
} }
defer metrics.MeasureSince([]string{"consul", "fsm", "acl", string(req.Op)}, time.Now())
switch req.Op { switch req.Op {
case structs.ACLForceSet: case structs.ACLForceSet:
fallthrough fallthrough
@ -213,6 +223,21 @@ func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} {
} }
} }
func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{} {
var req structs.TombstoneRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "tombstone", string(req.Op)}, time.Now())
switch req.Op {
case structs.TombstoneReap:
return c.state.ReapTombstones(req.ReapIndex)
default:
c.logger.Printf("[WARN] consul.fsm: Invalid Tombstone operation '%s'", req.Op)
return fmt.Errorf("Invalid Tombstone operation '%s'", req.Op)
}
}
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) { func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
defer func(start time.Time) { defer func(start time.Time) {
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start)) c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
@ -236,7 +261,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
} }
// Create a new state store // Create a new state store
state, err := NewStateStorePath(tmpPath, c.logOutput) state, err := NewStateStorePath(c.gc, tmpPath, c.logOutput)
if err != nil { if err != nil {
return err return err
} }
@ -299,6 +324,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
return err return err
} }
case structs.TombstoneRequestType:
var req structs.DirEntry
if err := dec.Decode(&req); err != nil {
return err
}
if err := c.state.TombstoneRestore(&req); err != nil {
return err
}
default: default:
return fmt.Errorf("Unrecognized msg type: %v", msgType) return fmt.Errorf("Unrecognized msg type: %v", msgType)
} }
@ -308,6 +342,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
} }
func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
defer metrics.MeasureSince([]string{"consul", "fsm", "persist"}, time.Now())
// Register the nodes // Register the nodes
encoder := codec.NewEncoder(sink, msgpackHandle) encoder := codec.NewEncoder(sink, msgpackHandle)
@ -339,6 +374,11 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
sink.Cancel() sink.Cancel()
return err return err
} }
if err := s.persistTombstones(sink, encoder); err != nil {
sink.Cancel()
return err
}
return nil return nil
} }
@ -444,6 +484,33 @@ func (s *consulSnapshot) persistKV(sink raft.SnapshotSink,
} }
} }
func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
streamCh := make(chan interface{}, 256)
errorCh := make(chan error)
go func() {
if err := s.state.TombstoneDump(streamCh); err != nil {
errorCh <- err
}
}()
for {
select {
case raw := <-streamCh:
if raw == nil {
return nil
}
sink.Write([]byte{byte(structs.TombstoneRequestType)})
if err := encoder.Encode(raw); err != nil {
return err
}
case err := <-errorCh:
return err
}
}
}
func (s *consulSnapshot) Release() { func (s *consulSnapshot) Release() {
s.state.Close() s.state.Close()
} }

View file

@ -42,7 +42,7 @@ func TestFSM_RegisterNode(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
fsm, err := NewFSM(path, os.Stderr) fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -82,7 +82,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
fsm, err := NewFSM(path, os.Stderr) fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -139,7 +139,7 @@ func TestFSM_DeregisterService(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
fsm, err := NewFSM(path, os.Stderr) fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -198,7 +198,7 @@ func TestFSM_DeregisterCheck(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
fsm, err := NewFSM(path, os.Stderr) fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -257,7 +257,7 @@ func TestFSM_DeregisterNode(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
fsm, err := NewFSM(path, os.Stderr) fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -328,7 +328,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
fsm, err := NewFSM(path, os.Stderr) fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -357,6 +357,12 @@ func TestFSM_SnapshotRestore(t *testing.T) {
acl := &structs.ACL{ID: generateUUID(), Name: "User Token"} acl := &structs.ACL{ID: generateUUID(), Name: "User Token"}
fsm.state.ACLSet(10, acl) fsm.state.ACLSet(10, acl)
fsm.state.KVSSet(11, &structs.DirEntry{
Key: "/remove",
Value: []byte("foo"),
})
fsm.state.KVSDelete(12, "/remove")
// Snapshot // Snapshot
snap, err := fsm.Snapshot() snap, err := fsm.Snapshot()
if err != nil { if err != nil {
@ -372,7 +378,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
} }
// Try to restore on a new FSM // Try to restore on a new FSM
fsm2, err := NewFSM(path, os.Stderr) fsm2, err := NewFSM(nil, path, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -446,6 +452,15 @@ func TestFSM_SnapshotRestore(t *testing.T) {
if idx <= 1 { if idx <= 1 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
// Verify tombstones are restored
_, res, err := fsm.state.tombstoneTable.Get("id", "/remove")
if err != nil {
t.Fatalf("err: %v", err)
}
if len(res) != 1 {
t.Fatalf("bad: %v", res)
}
} }
func TestFSM_KVSSet(t *testing.T) { func TestFSM_KVSSet(t *testing.T) {
@ -453,7 +468,7 @@ func TestFSM_KVSSet(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
fsm, err := NewFSM(path, os.Stderr) fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -492,7 +507,7 @@ func TestFSM_KVSDelete(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
fsm, err := NewFSM(path, os.Stderr) fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -542,7 +557,7 @@ func TestFSM_KVSDeleteTree(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
fsm, err := NewFSM(path, os.Stderr) fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -593,7 +608,7 @@ func TestFSM_KVSCheckAndSet(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
fsm, err := NewFSM(path, os.Stderr) fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -654,7 +669,7 @@ func TestFSM_SessionCreate_Destroy(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
fsm, err := NewFSM(path, os.Stderr) fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -738,7 +753,7 @@ func TestFSM_KVSLock(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
fsm, err := NewFSM(path, os.Stderr) fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -787,7 +802,7 @@ func TestFSM_KVSUnlock(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
fsm, err := NewFSM(path, os.Stderr) fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -854,7 +869,7 @@ func TestFSM_ACL_Set_Delete(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
fsm, err := NewFSM(path, os.Stderr) fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -925,3 +940,46 @@ func TestFSM_ACL_Set_Delete(t *testing.T) {
t.Fatalf("should be destroyed") t.Fatalf("should be destroyed")
} }
} }
func TestFSM_TombstoneReap(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
if err != nil {
t.Fatalf("err: %v", err)
}
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
// Create some tombstones
fsm.state.KVSSet(11, &structs.DirEntry{
Key: "/remove",
Value: []byte("foo"),
})
fsm.state.KVSDelete(12, "/remove")
// Create a new reap request
req := structs.TombstoneRequest{
Datacenter: "dc1",
Op: structs.TombstoneReap,
ReapIndex: 12,
}
buf, err := structs.Encode(structs.TombstoneRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if err, ok := resp.(error); ok {
t.Fatalf("resp: %v", err)
}
// Verify the tombstones are gone
_, res, err := fsm.state.tombstoneTable.Get("id")
if err != nil {
t.Fatalf("err: %v", err)
}
if len(res) != 0 {
t.Fatalf("bad: %v", res)
}
}

View file

@ -15,7 +15,7 @@ func TestHealthCheckRace(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
fsm, err := NewFSM(path, os.Stderr) fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }

View file

@ -135,13 +135,14 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
&reply.QueryMeta, &reply.QueryMeta,
state.QueryTables("KVSList"), state.QueryTables("KVSList"),
func() error { func() error {
index, ent, err := state.KVSList(args.Key) tombIndex, index, ent, err := state.KVSList(args.Key)
if err != nil { if err != nil {
return err return err
} }
if acl != nil { if acl != nil {
ent = FilterDirEnt(acl, ent) ent = FilterDirEnt(acl, ent)
} }
if len(ent) == 0 { if len(ent) == 0 {
// Must provide non-zero index to prevent blocking // Must provide non-zero index to prevent blocking
// Index 1 is impossible anyways (due to Raft internals) // Index 1 is impossible anyways (due to Raft internals)
@ -150,7 +151,6 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
} else { } else {
reply.Index = index reply.Index = index
} }
reply.Entries = nil
} else { } else {
// Determine the maximum affected index // Determine the maximum affected index
var maxIndex uint64 var maxIndex uint64
@ -159,7 +159,9 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
maxIndex = e.ModifyIndex maxIndex = e.ModifyIndex
} }
} }
if tombIndex > maxIndex {
maxIndex = tombIndex
}
reply.Index = maxIndex reply.Index = maxIndex
reply.Entries = ent reply.Entries = ent
} }

View file

@ -50,16 +50,15 @@ func (s *Server) monitorLeadership() {
// leaderLoop runs as long as we are the leader to run various // leaderLoop runs as long as we are the leader to run various
// maintence activities // maintence activities
func (s *Server) leaderLoop(stopCh chan struct{}) { func (s *Server) leaderLoop(stopCh chan struct{}) {
// Ensure we revoke leadership on stepdown
defer s.revokeLeadership()
// Fire a user event indicating a new leader // Fire a user event indicating a new leader
payload := []byte(s.config.NodeName) payload := []byte(s.config.NodeName)
if err := s.serfLAN.UserEvent(newLeaderEvent, payload, false); err != nil { if err := s.serfLAN.UserEvent(newLeaderEvent, payload, false); err != nil {
s.logger.Printf("[WARN] consul: failed to broadcast new leader event: %v", err) s.logger.Printf("[WARN] consul: failed to broadcast new leader event: %v", err)
} }
// Clear the session timers on either shutdown or step down, since we
// are no longer responsible for session expirations.
defer s.clearAllSessionTimers()
// Reconcile channel is only used once initial reconcile // Reconcile channel is only used once initial reconcile
// has succeeded // has succeeded
var reconcileCh chan serf.Member var reconcileCh chan serf.Member
@ -112,6 +111,8 @@ WAIT:
goto RECONCILE goto RECONCILE
case member := <-reconcileCh: case member := <-reconcileCh:
s.reconcileMember(member) s.reconcileMember(member)
case index := <-s.tombstoneGC.ExpireCh():
go s.reapTombstones(index)
} }
} }
} }
@ -121,6 +122,14 @@ WAIT:
// previously inflight transactions have been commited and that our // previously inflight transactions have been commited and that our
// state is up-to-date. // state is up-to-date.
func (s *Server) establishLeadership() error { func (s *Server) establishLeadership() error {
// Hint the tombstone expiration timer. When we freshly establish leadership
// we become the authoritative timer, and so we need to start the clock
// on any pending GC events.
s.tombstoneGC.SetEnabled(true)
lastIndex := s.raft.LastIndex()
s.tombstoneGC.Hint(lastIndex)
s.logger.Printf("[DEBUG] consul: reset tombstone GC to index %d", lastIndex)
// Setup ACLs if we are the leader and need to // Setup ACLs if we are the leader and need to
if err := s.initializeACL(); err != nil { if err := s.initializeACL(); err != nil {
s.logger.Printf("[ERR] consul: ACL initialization failed: %v", err) s.logger.Printf("[ERR] consul: ACL initialization failed: %v", err)
@ -144,6 +153,21 @@ func (s *Server) establishLeadership() error {
return nil return nil
} }
// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func (s *Server) revokeLeadership() error {
// Disable the tombstone GC, since it is only useful as a leader
s.tombstoneGC.SetEnabled(false)
// Clear the session timers on either shutdown or step down, since we
// are no longer responsible for session expirations.
if err := s.clearAllSessionTimers(); err != nil {
s.logger.Printf("[ERR] consul: Clearing session timers failed: %v", err)
return err
}
return nil
}
// initializeACL is used to setup the ACLs if we are the leader // initializeACL is used to setup the ACLs if we are the leader
// and need to do this. // and need to do this.
func (s *Server) initializeACL() error { func (s *Server) initializeACL() error {
@ -518,3 +542,24 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error {
} }
return nil return nil
} }
// reapTombstones is invoked by the current leader to manage garbage
// collection of tombstones. When a key is deleted, we trigger a tombstone
// GC clock. Once the expiration is reached, this routine is invoked
// to clear all tombstones before this index. This must be replicated
// through Raft to ensure consistency. We do this outside the leader loop
// to avoid blocking.
func (s *Server) reapTombstones(index uint64) {
defer metrics.MeasureSince([]string{"consul", "leader", "reapTombstones"}, time.Now())
req := structs.TombstoneRequest{
Datacenter: s.config.Datacenter,
Op: structs.TombstoneReap,
ReapIndex: index,
WriteRequest: structs.WriteRequest{Token: s.config.ACLToken},
}
_, err := s.raftApply(structs.TombstoneRequestType, &req)
if err != nil {
s.logger.Printf("[ERR] consul: failed to reap tombstones up to %d: %v",
index, err)
}
}

View file

@ -436,3 +436,127 @@ func TestLeader_MultiBootstrap(t *testing.T) {
} }
} }
} }
func TestLeader_TombstoneGC_Reset(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := s3.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.raftPeers.Peers()
return len(peers) == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
}
var leader *Server
for _, s := range servers {
if s.IsLeader() {
leader = s
break
}
}
if leader == nil {
t.Fatalf("Should have a leader")
}
// Check that the leader has a pending GC expiration
if !leader.tombstoneGC.PendingExpiration() {
t.Fatalf("should have pending expiration")
}
// Kill the leader
leader.Shutdown()
time.Sleep(100 * time.Millisecond)
// Wait for a new leader
leader = nil
testutil.WaitForResult(func() (bool, error) {
for _, s := range servers {
if s.IsLeader() {
leader = s
return true, nil
}
}
return false, nil
}, func(err error) {
t.Fatalf("should have leader")
})
// Check that the new leader has a pending GC expiration
testutil.WaitForResult(func() (bool, error) {
return leader.tombstoneGC.PendingExpiration(), nil
}, func(err error) {
t.Fatalf("should have pending expiration")
})
}
func TestLeader_ReapTombstones(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.TombstoneTTL = 50 * time.Millisecond
c.TombstoneTTLGranularity = 10 * time.Millisecond
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
testutil.WaitForLeader(t, client.Call, "dc1")
// Create a KV entry
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "test",
Value: []byte("test"),
},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Delete the KV entry (tombstoned)
arg.Op = structs.KVSDelete
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure we have a tombstone
_, res, err := s1.fsm.State().tombstoneTable.Get("id")
if err != nil {
t.Fatalf("err: %v", err)
}
if len(res) == 0 {
t.Fatalf("missing tombstones")
}
// Check that the new leader has a pending GC expiration
testutil.WaitForResult(func() (bool, error) {
_, res, err := s1.fsm.State().tombstoneTable.Get("id")
return len(res) == 0, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
}

View file

@ -389,10 +389,32 @@ func (t *MDBTable) GetTxn(tx *MDBTxn, index string, parts ...string) ([]interfac
// Accumulate the results // Accumulate the results
var results []interface{} var results []interface{}
err = idx.iterate(tx, key, func(encRowId, res []byte) bool { err = idx.iterate(tx, key, func(encRowId, res []byte) (bool, bool) {
obj := t.Decoder(res) obj := t.Decoder(res)
results = append(results, obj) results = append(results, obj)
return false return false, false
})
return results, err
}
// GetTxnLimit is like GetTxn limits the maximum number of
// rows it will return
func (t *MDBTable) GetTxnLimit(tx *MDBTxn, limit int, index string, parts ...string) ([]interface{}, error) {
// Get the associated index
idx, key, err := t.getIndex(index, parts)
if err != nil {
return nil, err
}
// Accumulate the results
var results []interface{}
num := 0
err = idx.iterate(tx, key, func(encRowId, res []byte) (bool, bool) {
num++
obj := t.Decoder(res)
results = append(results, obj)
return false, num == limit
}) })
return results, err return results, err
@ -412,10 +434,10 @@ func (t *MDBTable) StreamTxn(stream chan<- interface{}, tx *MDBTxn, index string
} }
// Stream the results // Stream the results
err = idx.iterate(tx, key, func(encRowId, res []byte) bool { err = idx.iterate(tx, key, func(encRowId, res []byte) (bool, bool) {
obj := t.Decoder(res) obj := t.Decoder(res)
stream <- obj stream <- obj
return false return false, false
}) })
return err return err
@ -508,7 +530,7 @@ func (t *MDBTable) innerDeleteWithIndex(tx *MDBTxn, idx *MDBIndex, key []byte) (
}() }()
// Delete everything as we iterate // Delete everything as we iterate
err = idx.iterate(tx, key, func(encRowId, res []byte) bool { err = idx.iterate(tx, key, func(encRowId, res []byte) (bool, bool) {
// Get the object // Get the object
obj := t.Decoder(res) obj := t.Decoder(res)
@ -542,7 +564,7 @@ func (t *MDBTable) innerDeleteWithIndex(tx *MDBTxn, idx *MDBIndex, key []byte) (
// Delete the object // Delete the object
num++ num++
return true return true, false
}) })
if err != nil { if err != nil {
return 0, err return 0, err
@ -644,7 +666,7 @@ func (i *MDBIndex) keyFromParts(parts ...string) []byte {
// and invoking the cb with each row. We dereference the rowid, // and invoking the cb with each row. We dereference the rowid,
// and only return the object row // and only return the object row
func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte, func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte,
cb func(encRowId, res []byte) bool) error { cb func(encRowId, res []byte) (bool, bool)) error {
table := tx.dbis[i.table.Name] table := tx.dbis[i.table.Name]
// If virtual, use the correct DBI // If virtual, use the correct DBI
@ -667,8 +689,9 @@ func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte,
var key, encRowId, objBytes []byte var key, encRowId, objBytes []byte
first := true first := true
shouldStop := false
shouldDelete := false shouldDelete := false
for { for !shouldStop {
if first && len(prefix) > 0 { if first && len(prefix) > 0 {
first = false first = false
key, encRowId, err = cursor.Get(prefix, mdb.SET_RANGE) key, encRowId, err = cursor.Get(prefix, mdb.SET_RANGE)
@ -708,7 +731,8 @@ func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte,
} }
// Invoke the cb // Invoke the cb
if shouldDelete = cb(encRowId, objBytes); shouldDelete { shouldDelete, shouldStop = cb(encRowId, objBytes)
if shouldDelete {
if err := cursor.Del(0); err != nil { if err := cursor.Del(0); err != nil {
return fmt.Errorf("delete failed: %v", err) return fmt.Errorf("delete failed: %v", err)
} }

View file

@ -2,12 +2,13 @@ package consul
import ( import (
"bytes" "bytes"
"github.com/armon/gomdb"
"github.com/hashicorp/go-msgpack/codec"
"io/ioutil" "io/ioutil"
"os" "os"
"reflect" "reflect"
"testing" "testing"
"github.com/armon/gomdb"
"github.com/hashicorp/go-msgpack/codec"
) )
type MockData struct { type MockData struct {
@ -970,3 +971,78 @@ func TestMDBTableStream(t *testing.T) {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
} }
func TestMDBTableGetTxnLimit(t *testing.T) {
dir, env := testMDBEnv(t)
defer os.RemoveAll(dir)
defer env.Close()
table := &MDBTable{
Env: env,
Name: "test",
Indexes: map[string]*MDBIndex{
"id": &MDBIndex{
Unique: true,
Fields: []string{"Key"},
},
"name": &MDBIndex{
Fields: []string{"First", "Last"},
},
"country": &MDBIndex{
Fields: []string{"Country"},
},
},
Encoder: MockEncoder,
Decoder: MockDecoder,
}
if err := table.Init(); err != nil {
t.Fatalf("err: %v", err)
}
objs := []*MockData{
&MockData{
Key: "1",
First: "Kevin",
Last: "Smith",
Country: "USA",
},
&MockData{
Key: "2",
First: "Kevin",
Last: "Wang",
Country: "USA",
},
&MockData{
Key: "3",
First: "Bernardo",
Last: "Torres",
Country: "Mexico",
},
}
// Insert some mock objects
for idx, obj := range objs {
if err := table.Insert(obj); err != nil {
t.Fatalf("err: %v", err)
}
if err := table.SetLastIndex(uint64(idx + 1)); err != nil {
t.Fatalf("err: %v", err)
}
}
// Start a readonly txn
tx, err := table.StartTxn(true, nil)
if err != nil {
panic(err)
}
defer tx.Abort()
// Verify with some gets
res, err := table.GetTxnLimit(tx, 2, "id")
if err != nil {
t.Fatalf("err: %v", err)
}
if len(res) != 2 {
t.Fatalf("expect 2 result: %#v", res)
}
}

View file

@ -134,6 +134,10 @@ type Server struct {
sessionTimers map[string]*time.Timer sessionTimers map[string]*time.Timer
sessionTimersLock sync.Mutex sessionTimersLock sync.Mutex
// tombstoneGC is used to track the pending GC invocations
// for the KV tombstones
tombstoneGC *TombstoneGC
shutdown bool shutdown bool
shutdownCh chan struct{} shutdownCh chan struct{}
shutdownLock sync.Mutex shutdownLock sync.Mutex
@ -189,6 +193,12 @@ func NewServer(config *Config) (*Server, error) {
// Create a logger // Create a logger
logger := log.New(config.LogOutput, "", log.LstdFlags) logger := log.New(config.LogOutput, "", log.LstdFlags)
// Create the tombstone GC
gc, err := NewTombstoneGC(config.TombstoneTTL, config.TombstoneTTLGranularity)
if err != nil {
return nil, err
}
// Create server // Create server
s := &Server{ s := &Server{
config: config, config: config,
@ -201,6 +211,7 @@ func NewServer(config *Config) (*Server, error) {
remoteConsuls: make(map[string][]*serverParts), remoteConsuls: make(map[string][]*serverParts),
rpcServer: rpc.NewServer(), rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS, rpcTLS: incomingTLS,
tombstoneGC: gc,
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
} }
@ -320,7 +331,7 @@ func (s *Server) setupRaft() error {
// Create the FSM // Create the FSM
var err error var err error
s.fsm, err = NewFSM(statePath, s.config.LogOutput) s.fsm, err = NewFSM(s.tombstoneGC, statePath, s.config.LogOutput)
if err != nil { if err != nil {
return err return err
} }

View file

@ -20,6 +20,7 @@ const (
dbServices = "services" dbServices = "services"
dbChecks = "checks" dbChecks = "checks"
dbKVS = "kvs" dbKVS = "kvs"
dbTombstone = "tombstones"
dbSessions = "sessions" dbSessions = "sessions"
dbSessionChecks = "sessionChecks" dbSessionChecks = "sessionChecks"
dbACLs = "acls" dbACLs = "acls"
@ -54,6 +55,7 @@ type StateStore struct {
serviceTable *MDBTable serviceTable *MDBTable
checkTable *MDBTable checkTable *MDBTable
kvsTable *MDBTable kvsTable *MDBTable
tombstoneTable *MDBTable
sessionTable *MDBTable sessionTable *MDBTable
sessionCheckTable *MDBTable sessionCheckTable *MDBTable
aclTable *MDBTable aclTable *MDBTable
@ -76,6 +78,10 @@ type StateStore struct {
// is never questioned. // is never questioned.
lockDelay map[string]time.Time lockDelay map[string]time.Time
lockDelayLock sync.RWMutex lockDelayLock sync.RWMutex
// GC is when we create tombstones to track their time-to-live.
// The GC is consumed upstream to manage clearing of tombstones.
gc *TombstoneGC
} }
// StateSnapshot is used to provide a point-in-time snapshot // StateSnapshot is used to provide a point-in-time snapshot
@ -102,18 +108,18 @@ func (s *StateSnapshot) Close() error {
} }
// NewStateStore is used to create a new state store // NewStateStore is used to create a new state store
func NewStateStore(logOutput io.Writer) (*StateStore, error) { func NewStateStore(gc *TombstoneGC, logOutput io.Writer) (*StateStore, error) {
// Create a new temp dir // Create a new temp dir
path, err := ioutil.TempDir("", "consul") path, err := ioutil.TempDir("", "consul")
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewStateStorePath(path, logOutput) return NewStateStorePath(gc, path, logOutput)
} }
// NewStateStorePath is used to create a new state store at a given path // NewStateStorePath is used to create a new state store at a given path
// The path is cleared on closing. // The path is cleared on closing.
func NewStateStorePath(path string, logOutput io.Writer) (*StateStore, error) { func NewStateStorePath(gc *TombstoneGC, path string, logOutput io.Writer) (*StateStore, error) {
// Open the env // Open the env
env, err := mdb.NewEnv() env, err := mdb.NewEnv()
if err != nil { if err != nil {
@ -126,6 +132,7 @@ func NewStateStorePath(path string, logOutput io.Writer) (*StateStore, error) {
env: env, env: env,
watch: make(map[*MDBTable]*NotifyGroup), watch: make(map[*MDBTable]*NotifyGroup),
lockDelay: make(map[string]time.Time), lockDelay: make(map[string]time.Time),
gc: gc,
} }
// Ensure we can initialize // Ensure we can initialize
@ -283,6 +290,29 @@ func (s *StateStore) initialize() error {
}, },
} }
s.tombstoneTable = &MDBTable{
Name: dbTombstone,
Indexes: map[string]*MDBIndex{
"id": &MDBIndex{
Unique: true,
Fields: []string{"Key"},
},
"id_prefix": &MDBIndex{
Virtual: true,
RealIndex: "id",
Fields: []string{"Key"},
IdxFunc: DefaultIndexPrefixFunc,
},
},
Decoder: func(buf []byte) interface{} {
out := new(structs.DirEntry)
if err := structs.Decode(buf, out); err != nil {
panic(err)
}
return out
},
}
s.sessionTable = &MDBTable{ s.sessionTable = &MDBTable{
Name: dbSessions, Name: dbSessions,
Indexes: map[string]*MDBIndex{ Indexes: map[string]*MDBIndex{
@ -340,7 +370,8 @@ func (s *StateStore) initialize() error {
// Store the set of tables // Store the set of tables
s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable, s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable,
s.kvsTable, s.sessionTable, s.sessionCheckTable, s.aclTable} s.kvsTable, s.tombstoneTable, s.sessionTable, s.sessionCheckTable,
s.aclTable}
for _, table := range s.tables { for _, table := range s.tables {
table.Env = s.env table.Env = s.env
table.Encoder = encoder table.Encoder = encoder
@ -1084,18 +1115,45 @@ func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) {
} }
// KVSList is used to list all KV entries with a prefix // KVSList is used to list all KV entries with a prefix
func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) { func (s *StateStore) KVSList(prefix string) (uint64, uint64, structs.DirEntries, error) {
idx, res, err := s.kvsTable.Get("id_prefix", prefix) tables := MDBTables{s.kvsTable, s.tombstoneTable}
tx, err := tables.StartTxn(true)
if err != nil {
return 0, 0, nil, err
}
defer tx.Abort()
idx, err := tables.LastIndexTxn(tx)
if err != nil {
return 0, 0, nil, err
}
res, err := s.kvsTable.GetTxn(tx, "id_prefix", prefix)
if err != nil {
return 0, 0, nil, err
}
ents := make(structs.DirEntries, len(res)) ents := make(structs.DirEntries, len(res))
for idx, r := range res { for idx, r := range res {
ents[idx] = r.(*structs.DirEntry) ents[idx] = r.(*structs.DirEntry)
} }
return idx, ents, err
// Check for the higest index in the tombstone table
var maxIndex uint64
res, err = s.tombstoneTable.GetTxn(tx, "id_prefix", prefix)
for _, r := range res {
ent := r.(*structs.DirEntry)
if ent.ModifyIndex > maxIndex {
maxIndex = ent.ModifyIndex
}
}
return maxIndex, idx, ents, err
} }
// KVSListKeys is used to list keys with a prefix, and up to a given seperator // KVSListKeys is used to list keys with a prefix, and up to a given seperator
func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, error) { func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, error) {
tx, err := s.kvsTable.StartTxn(true, nil) tables := MDBTables{s.kvsTable, s.tombstoneTable}
tx, err := tables.StartTxn(true)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
@ -1115,6 +1173,7 @@ func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, er
// Aggregate the stream // Aggregate the stream
stream := make(chan interface{}, 128) stream := make(chan interface{}, 128)
streamTomb := make(chan interface{}, 128)
done := make(chan struct{}) done := make(chan struct{})
var keys []string var keys []string
var maxIndex uint64 var maxIndex uint64
@ -1148,18 +1207,31 @@ func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, er
keys = append(keys, ent.Key) keys = append(keys, ent.Key)
} }
} }
// Handle the tombstones for any index updates
for raw := range streamTomb {
ent := raw.(*structs.DirEntry)
if ent.ModifyIndex > maxIndex {
maxIndex = ent.ModifyIndex
}
}
close(done) close(done)
}() }()
// Start the stream, and wait for completion // Start the stream, and wait for completion
err = s.kvsTable.StreamTxn(stream, tx, "id_prefix", prefix) if err = s.kvsTable.StreamTxn(stream, tx, "id_prefix", prefix); err != nil {
return 0, nil, err
}
if err := s.tombstoneTable.StreamTxn(streamTomb, tx, "id_prefix", prefix); err != nil {
return 0, nil, err
}
<-done <-done
// Use the maxIndex if we have any keys // Use the maxIndex if we have any keys
if maxIndex != 0 { if maxIndex != 0 {
idx = maxIndex idx = maxIndex
} }
return idx, keys, err return idx, keys, nil
} }
// KVSDelete is used to delete a KVS entry // KVSDelete is used to delete a KVS entry
@ -1177,25 +1249,64 @@ func (s *StateStore) KVSDeleteTree(index uint64, prefix string) error {
// kvsDeleteWithIndex does a delete with either the id or id_prefix // kvsDeleteWithIndex does a delete with either the id or id_prefix
func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts ...string) error { func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts ...string) error {
// Start a new txn tx, err := s.tables.StartTxn(false)
tx, err := s.kvsTable.StartTxn(false, nil)
if err != nil { if err != nil {
return err return err
} }
defer tx.Abort() defer tx.Abort()
if err := s.kvsDeleteWithIndexTxn(index, tx, tableIndex, parts...); err != nil {
num, err := s.kvsTable.DeleteTxn(tx, tableIndex, parts...)
if err != nil {
return err return err
} }
return tx.Commit()
}
// kvsDeleteWithIndexTxn does a delete within an existing transaction
func (s *StateStore) kvsDeleteWithIndexTxn(index uint64, tx *MDBTxn, tableIndex string, parts ...string) error {
num := 0
for {
// Get some number of entries to delete
pairs, err := s.kvsTable.GetTxnLimit(tx, 128, tableIndex, parts...)
if err != nil {
return err
}
// Create the tombstones and delete
for _, raw := range pairs {
ent := raw.(*structs.DirEntry)
ent.ModifyIndex = index // Update the index
ent.Value = nil // Reduce storage required
ent.Session = ""
if err := s.tombstoneTable.InsertTxn(tx, ent); err != nil {
return err
}
if num, err := s.kvsTable.DeleteTxn(tx, "id", ent.Key); err != nil {
return err
} else if num != 1 {
return fmt.Errorf("Failed to delete key '%s'", ent.Key)
}
}
// Increment the total number
num += len(pairs)
if len(pairs) == 0 {
break
}
}
if num > 0 { if num > 0 {
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
return err return err
} }
tx.Defer(func() { s.watch[s.kvsTable].Notify() }) tx.Defer(func() {
s.watch[s.kvsTable].Notify()
if s.gc != nil {
// If GC is configured, then we hint that this index
// required expiration.
s.gc.Hint(index)
}
})
} }
return tx.Commit() return nil
} }
// KVSCheckAndSet is used to perform an atomic check-and-set // KVSCheckAndSet is used to perform an atomic check-and-set
@ -1319,6 +1430,72 @@ func (s *StateStore) kvsSet(
return true, tx.Commit() return true, tx.Commit()
} }
// ReapTombstones is used to delete all the tombstones with a ModifyTime
// less than or equal to the given index. This is used to prevent unbounded
// storage growth of the tombstones.
func (s *StateStore) ReapTombstones(index uint64) error {
tx, err := s.tombstoneTable.StartTxn(false, nil)
if err != nil {
return fmt.Errorf("failed to start txn: %v", err)
}
defer tx.Abort()
// Scan the tombstone table for all the entries that are
// eligble for GC. This could be improved by indexing on
// ModifyTime and doing a less-than-equals scan, however
// we don't currently support numeric indexes internally.
// Luckily, this is a low frequency operation.
var toDelete []string
streamCh := make(chan interface{}, 128)
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
for raw := range streamCh {
ent := raw.(*structs.DirEntry)
if ent.ModifyIndex <= index {
toDelete = append(toDelete, ent.Key)
}
}
}()
if err := s.tombstoneTable.StreamTxn(streamCh, tx, "id"); err != nil {
s.logger.Printf("[ERR] consul.state: failed to scan tombstones: %v", err)
return fmt.Errorf("failed to scan tombstones: %v", err)
}
<-doneCh
// Delete each tombstone
if len(toDelete) > 0 {
s.logger.Printf("[DEBUG] consul.state: reaping %d tombstones up to %d", len(toDelete), index)
}
for _, key := range toDelete {
num, err := s.tombstoneTable.DeleteTxn(tx, "id", key)
if err != nil {
s.logger.Printf("[ERR] consul.state: failed to delete tombstone: %v", err)
return fmt.Errorf("failed to delete tombstone: %v", err)
}
if num != 1 {
return fmt.Errorf("failed to delete tombstone '%s'", key)
}
}
return tx.Commit()
}
// TombstoneRestore is used to restore a tombstone.
// It should only be used when doing a restore.
func (s *StateStore) TombstoneRestore(d *structs.DirEntry) error {
// Start a new txn
tx, err := s.tombstoneTable.StartTxn(false, nil)
if err != nil {
return err
}
defer tx.Abort()
if err := s.tombstoneTable.InsertTxn(tx, d); err != nil {
return err
}
return tx.Commit()
}
// SessionCreate is used to create a new session. The // SessionCreate is used to create a new session. The
// ID will be populated on a successful return // ID will be populated on a successful return
func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error { func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error {
@ -1537,7 +1714,7 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro
if session.Behavior == structs.SessionKeysDelete { if session.Behavior == structs.SessionKeysDelete {
// delete the keys held by the session // delete the keys held by the session
if err := s.deleteKeys(index, tx, id); err != nil { if err := s.kvsDeleteWithIndexTxn(index, tx, "session", id); err != nil {
return err return err
} }
@ -1618,23 +1795,6 @@ func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn,
return nil return nil
} }
// deleteKeys is used to delete all the keys created by a session
// within a given txn. All tables should be locked in the tx.
func (s *StateStore) deleteKeys(index uint64, tx *MDBTxn, id string) error {
num, err := s.kvsTable.DeleteTxn(tx, "session", id)
if err != nil {
return err
}
if num > 0 {
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
tx.Defer(func() { s.watch[s.kvsTable].Notify() })
}
return nil
}
// ACLSet is used to create or update an ACL entry // ACLSet is used to create or update an ACL entry
func (s *StateStore) ACLSet(index uint64, acl *structs.ACL) error { func (s *StateStore) ACLSet(index uint64, acl *structs.ACL) error {
// Check for an ID // Check for an ID
@ -1802,6 +1962,13 @@ func (s *StateSnapshot) KVSDump(stream chan<- interface{}) error {
return s.store.kvsTable.StreamTxn(stream, s.tx, "id") return s.store.kvsTable.StreamTxn(stream, s.tx, "id")
} }
// TombstoneDump is used to dump all tombstone entries. It takes a channel and streams
// back *struct.DirEntry objects. This will block and should be invoked
// in a goroutine.
func (s *StateSnapshot) TombstoneDump(stream chan<- interface{}) error {
return s.store.tombstoneTable.StreamTxn(stream, s.tx, "id")
}
// SessionList is used to list all the open sessions // SessionList is used to list all the open sessions
func (s *StateSnapshot) SessionList() ([]*structs.Session, error) { func (s *StateSnapshot) SessionList() ([]*structs.Session, error) {
res, err := s.store.sessionTable.GetTxn(s.tx, "id") res, err := s.store.sessionTable.GetTxn(s.tx, "id")

View file

@ -11,7 +11,7 @@ import (
) )
func testStateStore() (*StateStore, error) { func testStateStore() (*StateStore, error) {
return NewStateStore(os.Stderr) return NewStateStore(nil, os.Stderr)
} }
func TestEnsureRegistration(t *testing.T) { func TestEnsureRegistration(t *testing.T) {
@ -688,23 +688,32 @@ func TestStoreSnapshot(t *testing.T) {
if err := store.KVSSet(15, d); err != nil { if err := store.KVSSet(15, d); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
d = &structs.DirEntry{Key: "/web/c", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(16, d); err != nil {
t.Fatalf("err: %v", err)
}
// Create a tombstone
// TODO: Change to /web/c causes failure?
if err := store.KVSDelete(17, "/web/a"); err != nil {
t.Fatalf("err: %v", err)
}
// Add some sessions // Add some sessions
session := &structs.Session{ID: generateUUID(), Node: "foo"} session := &structs.Session{ID: generateUUID(), Node: "foo"}
if err := store.SessionCreate(16, session); err != nil { if err := store.SessionCreate(18, session); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
session = &structs.Session{ID: generateUUID(), Node: "bar"} session = &structs.Session{ID: generateUUID(), Node: "bar"}
if err := store.SessionCreate(17, session); err != nil { if err := store.SessionCreate(19, session); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
d.Session = session.ID d.Session = session.ID
if ok, err := store.KVSLock(18, d); err != nil || !ok { if ok, err := store.KVSLock(20, d); err != nil || !ok {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
session = &structs.Session{ID: generateUUID(), Node: "bar", TTL: "60s"} session = &structs.Session{ID: generateUUID(), Node: "bar", TTL: "60s"}
if err := store.SessionCreate(19, session); err != nil { if err := store.SessionCreate(21, session); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -713,7 +722,7 @@ func TestStoreSnapshot(t *testing.T) {
Name: "User token", Name: "User token",
Type: structs.ACLTypeClient, Type: structs.ACLTypeClient,
} }
if err := store.ACLSet(20, a1); err != nil { if err := store.ACLSet(21, a1); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -722,7 +731,7 @@ func TestStoreSnapshot(t *testing.T) {
Name: "User token", Name: "User token",
Type: structs.ACLTypeClient, Type: structs.ACLTypeClient,
} }
if err := store.ACLSet(21, a2); err != nil { if err := store.ACLSet(22, a2); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -734,7 +743,7 @@ func TestStoreSnapshot(t *testing.T) {
defer snap.Close() defer snap.Close()
// Check the last nodes // Check the last nodes
if idx := snap.LastIndex(); idx != 21 { if idx := snap.LastIndex(); idx != 22 {
t.Fatalf("bad: %v", idx) t.Fatalf("bad: %v", idx)
} }
@ -786,7 +795,29 @@ func TestStoreSnapshot(t *testing.T) {
} }
<-doneCh <-doneCh
if len(ents) != 2 { if len(ents) != 2 {
t.Fatalf("missing KVS entries!") t.Fatalf("missing KVS entries! %#v", ents)
}
// Check we have the tombstone entries
streamCh = make(chan interface{}, 64)
doneCh = make(chan struct{})
ents = nil
go func() {
for {
obj := <-streamCh
if obj == nil {
close(doneCh)
return
}
ents = append(ents, obj.(*structs.DirEntry))
}
}()
if err := snap.TombstoneDump(streamCh); err != nil {
t.Fatalf("err: %v", err)
}
<-doneCh
if len(ents) != 1 {
t.Fatalf("missing tombstone entries!")
} }
// Check there are 3 sessions // Check there are 3 sessions
@ -818,13 +849,13 @@ func TestStoreSnapshot(t *testing.T) {
} }
// Make some changes! // Make some changes!
if err := store.EnsureService(22, "foo", &structs.NodeService{"db", "db", []string{"slave"}, 8000}); err != nil { if err := store.EnsureService(23, "foo", &structs.NodeService{"db", "db", []string{"slave"}, 8000}); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if err := store.EnsureService(23, "bar", &structs.NodeService{"db", "db", []string{"master"}, 8000}); err != nil { if err := store.EnsureService(24, "bar", &structs.NodeService{"db", "db", []string{"master"}, 8000}); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if err := store.EnsureNode(24, structs.Node{"baz", "127.0.0.3"}); err != nil { if err := store.EnsureNode(25, structs.Node{"baz", "127.0.0.3"}); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
checkAfter := &structs.HealthCheck{ checkAfter := &structs.HealthCheck{
@ -834,16 +865,16 @@ func TestStoreSnapshot(t *testing.T) {
Status: structs.HealthCritical, Status: structs.HealthCritical,
ServiceID: "db", ServiceID: "db",
} }
if err := store.EnsureCheck(26, checkAfter); err != nil { if err := store.EnsureCheck(27, checkAfter); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if err := store.KVSDelete(26, "/web/b"); err != nil { if err := store.KVSDelete(28, "/web/b"); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Nuke an ACL // Nuke an ACL
if err := store.ACLDelete(27, a1.ID); err != nil { if err := store.ACLDelete(29, a1.ID); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -897,6 +928,28 @@ func TestStoreSnapshot(t *testing.T) {
t.Fatalf("missing KVS entries!") t.Fatalf("missing KVS entries!")
} }
// Check we have the tombstone entries
streamCh = make(chan interface{}, 64)
doneCh = make(chan struct{})
ents = nil
go func() {
for {
obj := <-streamCh
if obj == nil {
close(doneCh)
return
}
ents = append(ents, obj.(*structs.DirEntry))
}
}()
if err := snap.TombstoneDump(streamCh); err != nil {
t.Fatalf("err: %v", err)
}
<-doneCh
if len(ents) != 1 {
t.Fatalf("missing tombstone entries!")
}
// Check there are 3 sessions // Check there are 3 sessions
sessions, err = snap.SessionList() sessions, err = snap.SessionList()
if err != nil { if err != nil {
@ -1413,6 +1466,15 @@ func TestKVSDelete(t *testing.T) {
} }
defer store.Close() defer store.Close()
ttl := 10 * time.Millisecond
gran := 5 * time.Millisecond
gc, err := NewTombstoneGC(ttl, gran)
if err != nil {
t.Fatalf("err: %v", err)
}
gc.SetEnabled(true)
store.gc = gc
// Create the entry // Create the entry
d := &structs.DirEntry{Key: "/foo", Flags: 42, Value: []byte("test")} d := &structs.DirEntry{Key: "/foo", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1000, d); err != nil { if err := store.KVSSet(1000, d); err != nil {
@ -1435,6 +1497,25 @@ func TestKVSDelete(t *testing.T) {
if d != nil { if d != nil {
t.Fatalf("bad: %v", d) t.Fatalf("bad: %v", d)
} }
// Check tombstone exists
_, res, err := store.tombstoneTable.Get("id", "/foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if res == nil || res[0].(*structs.DirEntry).ModifyIndex != 1020 {
t.Fatalf("bad: %#v", d)
}
// Check that we get a delete
select {
case idx := <-gc.ExpireCh():
if idx != 1020 {
t.Fatalf("bad %d", idx)
}
case <-time.After(20 * time.Millisecond):
t.Fatalf("should expire")
}
} }
func TestKVSCheckAndSet(t *testing.T) { func TestKVSCheckAndSet(t *testing.T) {
@ -1508,7 +1589,7 @@ func TestKVS_List(t *testing.T) {
defer store.Close() defer store.Close()
// Should not exist // Should not exist
idx, ents, err := store.KVSList("/web") _, idx, ents, err := store.KVSList("/web")
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -1534,7 +1615,7 @@ func TestKVS_List(t *testing.T) {
} }
// Should list // Should list
idx, ents, err = store.KVSList("/web") _, idx, ents, err = store.KVSList("/web")
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -1556,6 +1637,55 @@ func TestKVS_List(t *testing.T) {
} }
} }
func TestKVSList_TombstoneIndex(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
// Create the entries
d := &structs.DirEntry{Key: "/web/a", Value: []byte("test")}
if err := store.KVSSet(1000, d); err != nil {
t.Fatalf("err: %v", err)
}
d = &structs.DirEntry{Key: "/web/b", Value: []byte("test")}
if err := store.KVSSet(1001, d); err != nil {
t.Fatalf("err: %v", err)
}
d = &structs.DirEntry{Key: "/web/c", Value: []byte("test")}
if err := store.KVSSet(1002, d); err != nil {
t.Fatalf("err: %v", err)
}
// Nuke the last node
err = store.KVSDeleteTree(1003, "/web/c")
if err != nil {
t.Fatalf("err: %v", err)
}
// Add another node
d = &structs.DirEntry{Key: "/other", Value: []byte("test")}
if err := store.KVSSet(1004, d); err != nil {
t.Fatalf("err: %v", err)
}
// List should properly reflect tombstoned value
tombIdx, idx, ents, err := store.KVSList("/web")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 1004 {
t.Fatalf("bad: %v", idx)
}
if tombIdx != 1003 {
t.Fatalf("bad: %v", idx)
}
if len(ents) != 2 {
t.Fatalf("bad: %v", ents)
}
}
func TestKVS_ListKeys(t *testing.T) { func TestKVS_ListKeys(t *testing.T) {
store, err := testStateStore() store, err := testStateStore()
if err != nil { if err != nil {
@ -1730,6 +1860,68 @@ func TestKVS_ListKeys_Index(t *testing.T) {
} }
} }
func TestKVS_ListKeys_TombstoneIndex(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
// Create the entries
d := &structs.DirEntry{Key: "/foo/a", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1000, d); err != nil {
t.Fatalf("err: %v", err)
}
d = &structs.DirEntry{Key: "/bar/b", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1001, d); err != nil {
t.Fatalf("err: %v", err)
}
d = &structs.DirEntry{Key: "/baz/c", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1002, d); err != nil {
t.Fatalf("err: %v", err)
}
d = &structs.DirEntry{Key: "/other/d", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1003, d); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.KVSDelete(1004, "/baz/c"); err != nil {
t.Fatalf("err: %v", err)
}
idx, keys, err := store.KVSListKeys("/foo", "")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 1000 {
t.Fatalf("bad: %v", idx)
}
if len(keys) != 1 {
t.Fatalf("bad: %v", keys)
}
idx, keys, err = store.KVSListKeys("/ba", "")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 1004 {
t.Fatalf("bad: %v", idx)
}
if len(keys) != 1 {
t.Fatalf("bad: %v", keys)
}
idx, keys, err = store.KVSListKeys("/nope", "")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 1004 {
t.Fatalf("bad: %v", idx)
}
if len(keys) != 0 {
t.Fatalf("bad: %v", keys)
}
}
func TestKVSDeleteTree(t *testing.T) { func TestKVSDeleteTree(t *testing.T) {
store, err := testStateStore() store, err := testStateStore()
if err != nil { if err != nil {
@ -1737,6 +1929,15 @@ func TestKVSDeleteTree(t *testing.T) {
} }
defer store.Close() defer store.Close()
ttl := 10 * time.Millisecond
gran := 5 * time.Millisecond
gc, err := NewTombstoneGC(ttl, gran)
if err != nil {
t.Fatalf("err: %v", err)
}
gc.SetEnabled(true)
store.gc = gc
// Should not exist // Should not exist
err = store.KVSDeleteTree(1000, "/web") err = store.KVSDeleteTree(1000, "/web")
if err != nil { if err != nil {
@ -1764,16 +1965,134 @@ func TestKVSDeleteTree(t *testing.T) {
} }
// Nothing should list // Nothing should list
idx, ents, err := store.KVSList("/web") tombIdx, idx, ents, err := store.KVSList("/web")
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if idx != 1010 { if idx != 1010 {
t.Fatalf("bad: %v", idx) t.Fatalf("bad: %v", idx)
} }
if tombIdx != 1010 {
t.Fatalf("bad: %v", idx)
}
if len(ents) != 0 { if len(ents) != 0 {
t.Fatalf("bad: %v", ents) t.Fatalf("bad: %v", ents)
} }
// Check tombstones exists
_, res, err := store.tombstoneTable.Get("id_prefix", "/web")
if err != nil {
t.Fatalf("err: %v", err)
}
if len(res) != 3 {
t.Fatalf("bad: %#v", d)
}
for _, r := range res {
if r.(*structs.DirEntry).ModifyIndex != 1010 {
t.Fatalf("bad: %#v", r)
}
}
// Check that we get a delete
select {
case idx := <-gc.ExpireCh():
if idx != 1010 {
t.Fatalf("bad %d", idx)
}
case <-time.After(20 * time.Millisecond):
t.Fatalf("should expire")
}
}
func TestReapTombstones(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
ttl := 10 * time.Millisecond
gran := 5 * time.Millisecond
gc, err := NewTombstoneGC(ttl, gran)
if err != nil {
t.Fatalf("err: %v", err)
}
gc.SetEnabled(true)
store.gc = gc
// Should not exist
err = store.KVSDeleteTree(1000, "/web")
if err != nil {
t.Fatalf("err: %v", err)
}
// Create the entries
d := &structs.DirEntry{Key: "/web/a", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1000, d); err != nil {
t.Fatalf("err: %v", err)
}
d = &structs.DirEntry{Key: "/web/b", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1001, d); err != nil {
t.Fatalf("err: %v", err)
}
d = &structs.DirEntry{Key: "/web/sub/c", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1002, d); err != nil {
t.Fatalf("err: %v", err)
}
// Nuke just a
err = store.KVSDelete(1010, "/web/a")
if err != nil {
t.Fatalf("err: %v", err)
}
// Nuke the web tree
err = store.KVSDeleteTree(1020, "/web")
if err != nil {
t.Fatalf("err: %v", err)
}
// Do a reap, should be a noop
if err := store.ReapTombstones(1000); err != nil {
t.Fatalf("err: %v", err)
}
// Check tombstones exists
_, res, err := store.tombstoneTable.Get("id_prefix", "/web")
if err != nil {
t.Fatalf("err: %v", err)
}
if len(res) != 3 {
t.Fatalf("bad: %#v", d)
}
// Do a reap, should remove just /web/a
if err := store.ReapTombstones(1010); err != nil {
t.Fatalf("err: %v", err)
}
// Check tombstones exists
_, res, err = store.tombstoneTable.Get("id_prefix", "/web")
if err != nil {
t.Fatalf("err: %v", err)
}
if len(res) != 2 {
t.Fatalf("bad: %#v", d)
}
// Do a reap, should remove them all
if err := store.ReapTombstones(1025); err != nil {
t.Fatalf("err: %v", err)
}
// Check no tombstones exists
_, res, err = store.tombstoneTable.Get("id_prefix", "/web")
if err != nil {
t.Fatalf("err: %v", err)
}
if len(res) != 0 {
t.Fatalf("bad: %#v", d)
}
} }
func TestSessionCreate(t *testing.T) { func TestSessionCreate(t *testing.T) {

View file

@ -23,6 +23,7 @@ const (
KVSRequestType KVSRequestType
SessionRequestType SessionRequestType
ACLRequestType ACLRequestType
TombstoneRequestType
) )
const ( const (
@ -531,6 +532,24 @@ type EventFireResponse struct {
QueryMeta QueryMeta
} }
type TombstoneOp string
const (
TombstoneReap TombstoneOp = "reap"
)
// TombstoneRequest is used to trigger a reaping of the tombstones
type TombstoneRequest struct {
Datacenter string
Op TombstoneOp
ReapIndex uint64
WriteRequest
}
func (r *TombstoneRequest) RequestDatacenter() string {
return r.Datacenter
}
// msgpackHandle is a shared handle for encoding/decoding of structs // msgpackHandle is a shared handle for encoding/decoding of structs
var msgpackHandle = &codec.MsgpackHandle{} var msgpackHandle = &codec.MsgpackHandle{}

150
consul/tombstone_gc.go Normal file
View file

@ -0,0 +1,150 @@
package consul
import (
"fmt"
"sync"
"time"
)
// TombstoneGC is used to track creation of tombstones
// so that they can be garbage collected after their TTL
// expires. The tombstones allow queries to provide monotonic
// index values within the TTL window. The GC is used to
// prevent monotonic growth in storage usage. This is a trade off
// between the length of the TTL and the storage overhead.
//
// In practice, this is required to fix the issue of delete
// visibility. When data is deleted from the KV store, the
// "latest" row can go backwards if the newest row is removed.
// The tombstones provide a way to ensure time doesn't move
// backwards within some interval.
//
type TombstoneGC struct {
ttl time.Duration
granularity time.Duration
// enabled controls if we actually setup any timers.
enabled bool
// expires maps the time of expiration to the highest
// tombstone value that should be expired.
expires map[time.Time]*expireInterval
// expireCh is used to stream expiration
expireCh chan uint64
// lock is used to ensure safe access to all the fields
lock sync.Mutex
}
// expireInterval is used to track the maximum index
// to expire in a given interval with a timer
type expireInterval struct {
maxIndex uint64
timer *time.Timer
}
// NewTombstoneGC is used to construct a new TombstoneGC given
// a TTL for tombstones and a tracking granularity. Longer TTLs
// ensure correct behavior for more time, but use more storage.
// A shorter granularity increases the number of Raft transactions
// and reduce how far past the TTL we perform GC.
func NewTombstoneGC(ttl, granularity time.Duration) (*TombstoneGC, error) {
// Sanity check the inputs
if ttl <= 0 || granularity <= 0 {
return nil, fmt.Errorf("Tombstone TTL and granularity must be positive")
}
t := &TombstoneGC{
ttl: ttl,
granularity: granularity,
enabled: false,
expires: make(map[time.Time]*expireInterval),
expireCh: make(chan uint64, 1),
}
return t, nil
}
// ExpireCh is used to return a channel that streams the next index
// that should be expired
func (t *TombstoneGC) ExpireCh() <-chan uint64 {
return t.expireCh
}
// SetEnabled is used to control if the tombstone GC is
// enabled. Should only be enabled by the leader node.
func (t *TombstoneGC) SetEnabled(enabled bool) {
t.lock.Lock()
defer t.lock.Unlock()
if enabled == t.enabled {
return
}
// Stop all the timers and clear
if !enabled {
for _, exp := range t.expires {
exp.timer.Stop()
}
t.expires = make(map[time.Time]*expireInterval)
}
// Update the status
t.enabled = enabled
}
// Hint is used to indicate that keys at the given index have been
// deleted, and that their GC should be scheduled.
func (t *TombstoneGC) Hint(index uint64) {
expires := t.nextExpires()
t.lock.Lock()
defer t.lock.Unlock()
if !t.enabled {
return
}
// Check for an existing expiration timer
exp, ok := t.expires[expires]
if ok {
// Increment the highest index to be expired at that time
if index > exp.maxIndex {
exp.maxIndex = index
}
return
}
// Create new expiration time
t.expires[expires] = &expireInterval{
maxIndex: index,
timer: time.AfterFunc(expires.Sub(time.Now()), func() {
t.expireTime(expires)
}),
}
}
// PendingExpiration is used to check if any expirations are pending
func (t *TombstoneGC) PendingExpiration() bool {
t.lock.Lock()
defer t.lock.Unlock()
return len(t.expires) > 0
}
// nextExpires is used to calculate the next experation time
func (t *TombstoneGC) nextExpires() time.Time {
expires := time.Now().Add(t.ttl)
remain := expires.UnixNano() % int64(t.granularity)
adj := expires.Add(t.granularity - time.Duration(remain))
return adj
}
// expireTime is used to expire the entries at the given time
func (t *TombstoneGC) expireTime(expires time.Time) {
// Get the maximum index and clear the entry
t.lock.Lock()
exp := t.expires[expires]
delete(t.expires, expires)
t.lock.Unlock()
// Notify the expires channel
t.expireCh <- exp.maxIndex
}

104
consul/tombstone_gc_test.go Normal file
View file

@ -0,0 +1,104 @@
package consul
import (
"testing"
"time"
)
func TestTombstoneGC_invalid(t *testing.T) {
_, err := NewTombstoneGC(0, 0)
if err == nil {
t.Fatalf("should fail")
}
_, err = NewTombstoneGC(time.Second, 0)
if err == nil {
t.Fatalf("should fail")
}
_, err = NewTombstoneGC(0, time.Second)
if err == nil {
t.Fatalf("should fail")
}
}
func TestTombstoneGC(t *testing.T) {
ttl := 20 * time.Millisecond
gran := 5 * time.Millisecond
gc, err := NewTombstoneGC(ttl, gran)
if err != nil {
t.Fatalf("should fail")
}
gc.SetEnabled(true)
if gc.PendingExpiration() {
t.Fatalf("should not be pending")
}
start := time.Now()
gc.Hint(100)
time.Sleep(2 * gran)
start2 := time.Now()
gc.Hint(120)
gc.Hint(125)
if !gc.PendingExpiration() {
t.Fatalf("should be pending")
}
select {
case index := <-gc.ExpireCh():
end := time.Now()
if end.Sub(start) < ttl {
t.Fatalf("expired early")
}
if index != 100 {
t.Fatalf("bad index: %d", index)
}
case <-time.After(ttl * 2):
t.Fatalf("should get expiration")
}
select {
case index := <-gc.ExpireCh():
end := time.Now()
if end.Sub(start2) < ttl {
t.Fatalf("expired early")
}
if index != 125 {
t.Fatalf("bad index: %d", index)
}
case <-time.After(ttl * 2):
t.Fatalf("should get expiration")
}
}
func TestTombstoneGC_Expire(t *testing.T) {
ttl := 10 * time.Millisecond
gran := 5 * time.Millisecond
gc, err := NewTombstoneGC(ttl, gran)
if err != nil {
t.Fatalf("should fail")
}
gc.SetEnabled(true)
if gc.PendingExpiration() {
t.Fatalf("should not be pending")
}
gc.Hint(100)
gc.SetEnabled(false)
if gc.PendingExpiration() {
t.Fatalf("should not be pending")
}
select {
case <-gc.ExpireCh():
t.Fatalf("shoudl be reset")
case <-time.After(20 * time.Millisecond):
}
}