From d5369098ba7fd383b416e0236fa42b0e4ec42561 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sun, 30 Nov 2014 22:05:18 -0700 Subject: [PATCH 01/30] consul: Adding TombstoneGC config --- consul/config.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/consul/config.go b/consul/config.go index 9cb1944cb..c3614f0b2 100644 --- a/consul/config.go +++ b/consul/config.go @@ -158,6 +158,24 @@ type Config struct { // "allow" can be used to allow all requests. This is not recommended. ACLDownPolicy string + // TombstoneGC is used to control how long KV tombstones are retained. + // This provides a window of time where the X-Consul-Index is monotonic. + // Outside this window, the index may not be monotonic. This is a result + // of a few trade offs: + // 1) The index is defined by the data view and not globally. This is a + // performance optimization that prevents any write from incrementing the + // index for all data views. + // 2) Tombstones are not kept indefinitely, since otherwise storage required + // is also monotonic. This prevents deletes from reducing the disk space + // used. + // In theory, neither of these are intrinsic limitations, however for the + // purposes of building a practical system, they are reaonable trade offs. + // + // It is also possible to set this to an incredibly long time, thereby + // simulating infinite retention. This is not recommended however. + // + TombstoneGC time.Duration + // ServerUp callback can be used to trigger a notification that // a Consul server is now up and known about. ServerUp func() @@ -216,6 +234,7 @@ func DefaultConfig() *Config { ACLTTL: 30 * time.Second, ACLDefaultPolicy: "allow", ACLDownPolicy: "extend-cache", + TombstoneGC: 15 * time.Minute, } // Increase our reap interval to 3 days instead of 24h. From 68caf9046c9afe28cd533aba9df032662b8cba21 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 1 Dec 2014 11:22:29 -0800 Subject: [PATCH 02/30] consul: Create tombstones before key deletes --- consul/state_store.go | 82 ++++++++++++++++++++++++++++++++----------- 1 file changed, 62 insertions(+), 20 deletions(-) diff --git a/consul/state_store.go b/consul/state_store.go index 273311435..fcdb54649 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -20,6 +20,7 @@ const ( dbServices = "services" dbChecks = "checks" dbKVS = "kvs" + dbTombstone = "tombstones" dbSessions = "sessions" dbSessionChecks = "sessionChecks" dbACLs = "acls" @@ -54,6 +55,7 @@ type StateStore struct { serviceTable *MDBTable checkTable *MDBTable kvsTable *MDBTable + tombstoneTable *MDBTable sessionTable *MDBTable sessionCheckTable *MDBTable aclTable *MDBTable @@ -283,6 +285,29 @@ func (s *StateStore) initialize() error { }, } + s.tombstoneTable = &MDBTable{ + Name: dbTombstone, + Indexes: map[string]*MDBIndex{ + "id": &MDBIndex{ + Unique: true, + Fields: []string{"Key"}, + }, + "id_prefix": &MDBIndex{ + Virtual: true, + RealIndex: "id", + Fields: []string{"Key"}, + IdxFunc: DefaultIndexPrefixFunc, + }, + }, + Decoder: func(buf []byte) interface{} { + out := new(structs.DirEntry) + if err := structs.Decode(buf, out); err != nil { + panic(err) + } + return out + }, + } + s.sessionTable = &MDBTable{ Name: dbSessions, Indexes: map[string]*MDBIndex{ @@ -340,7 +365,8 @@ func (s *StateStore) initialize() error { // Store the set of tables s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable, - s.kvsTable, s.sessionTable, s.sessionCheckTable, s.aclTable} + s.kvsTable, s.tombstoneTable, s.sessionTable, s.sessionCheckTable, + s.aclTable} for _, table := range s.tables { table.Env = s.env table.Encoder = encoder @@ -1177,12 +1203,29 @@ func (s *StateStore) KVSDeleteTree(index uint64, prefix string) error { // kvsDeleteWithIndex does a delete with either the id or id_prefix func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts ...string) error { - // Start a new txn tx, err := s.kvsTable.StartTxn(false, nil) if err != nil { return err } defer tx.Abort() + return s.kvsDeleteWithIndexTxn(index, tx, tableIndex, parts...) +} + +// kvsDeleteWithIndexTxn does a delete within an existing transaction +func (s *StateStore) kvsDeleteWithIndexTxn(index uint64, tx *MDBTxn, tableIndex string, parts ...string) error { + // Create the appropriate tombstone entries + streamCh := make(chan interface{}, 128) + doneCh := make(chan struct{}) + var tombstoneErr error + go s.kvsTombstoneEntries(index, tx, streamCh, doneCh, &tombstoneErr) + err := s.kvsTable.StreamTxn(streamCh, tx, tableIndex, parts...) + <-doneCh + if err != nil { + return err + } + if tombstoneErr != nil { + return tombstoneErr + } num, err := s.kvsTable.DeleteTxn(tx, tableIndex, parts...) if err != nil { @@ -1198,6 +1241,22 @@ func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts . return tx.Commit() } +// kvsTombstoneEntries is used to consume KVS entries over a stream +// and commit them as tombstones within a given transaction and index. +func (s *StateStore) kvsTombstoneEntries(index uint64, tx *MDBTxn, streamCh chan interface{}, doneCh chan struct{}, errOut *error) { + defer close(doneCh) + for raw := range streamCh { + ent := raw.(*structs.DirEntry) + ent.ModifyIndex = index + ent.Value = nil + ent.Session = "" + if err := s.tombstoneTable.InsertTxn(tx, ent); err != nil { + s.logger.Printf("[ERR] consul.state: Failed to create tombstone for %s: %s", ent.Key, err) + *errOut = err + } + } +} + // KVSCheckAndSet is used to perform an atomic check-and-set func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, error) { return s.kvsSet(index, d, kvCAS) @@ -1537,7 +1596,7 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro if session.Behavior == structs.SessionKeysDelete { // delete the keys held by the session - if err := s.deleteKeys(index, tx, id); err != nil { + if err := s.kvsDeleteWithIndexTxn(index, tx, "session", id); err != nil { return err } @@ -1618,23 +1677,6 @@ func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn, return nil } -// deleteKeys is used to delete all the keys created by a session -// within a given txn. All tables should be locked in the tx. -func (s *StateStore) deleteKeys(index uint64, tx *MDBTxn, id string) error { - num, err := s.kvsTable.DeleteTxn(tx, "session", id) - if err != nil { - return err - } - - if num > 0 { - if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { - return err - } - tx.Defer(func() { s.watch[s.kvsTable].Notify() }) - } - return nil -} - // ACLSet is used to create or update an ACL entry func (s *StateStore) ACLSet(index uint64, acl *structs.ACL) error { // Check for an ID From 3e2bd0db2c3bed43fc6f0a86caf377c0f7d2f973 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 10 Dec 2014 16:38:33 -0800 Subject: [PATCH 03/30] consul: Rename TombstoneGC to TombstoneTTL --- consul/config.go | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/consul/config.go b/consul/config.go index c3614f0b2..4e834b178 100644 --- a/consul/config.go +++ b/consul/config.go @@ -158,7 +158,7 @@ type Config struct { // "allow" can be used to allow all requests. This is not recommended. ACLDownPolicy string - // TombstoneGC is used to control how long KV tombstones are retained. + // TombstoneTTL is used to control how long KV tombstones are retained. // This provides a window of time where the X-Consul-Index is monotonic. // Outside this window, the index may not be monotonic. This is a result // of a few trade offs: @@ -174,7 +174,12 @@ type Config struct { // It is also possible to set this to an incredibly long time, thereby // simulating infinite retention. This is not recommended however. // - TombstoneGC time.Duration + TombstoneTTL time.Duration + + // TombstoneTTLGranularity is used to control how granular the timers are + // for the Tombstone GC. This is used to batch the GC of many keys together + // to reduce overhead. It is unlikely a user would ever need to tune this. + TombstoneTTLGranularity time.Duration // ServerUp callback can be used to trigger a notification that // a Consul server is now up and known about. @@ -223,18 +228,19 @@ func DefaultConfig() *Config { } conf := &Config{ - Datacenter: DefaultDC, - NodeName: hostname, - RPCAddr: DefaultRPCAddr, - RaftConfig: raft.DefaultConfig(), - SerfLANConfig: serf.DefaultConfig(), - SerfWANConfig: serf.DefaultConfig(), - ReconcileInterval: 60 * time.Second, - ProtocolVersion: ProtocolVersionMax, - ACLTTL: 30 * time.Second, - ACLDefaultPolicy: "allow", - ACLDownPolicy: "extend-cache", - TombstoneGC: 15 * time.Minute, + Datacenter: DefaultDC, + NodeName: hostname, + RPCAddr: DefaultRPCAddr, + RaftConfig: raft.DefaultConfig(), + SerfLANConfig: serf.DefaultConfig(), + SerfWANConfig: serf.DefaultConfig(), + ReconcileInterval: 60 * time.Second, + ProtocolVersion: ProtocolVersionMax, + ACLTTL: 30 * time.Second, + ACLDefaultPolicy: "allow", + ACLDownPolicy: "extend-cache", + TombstoneTTL: 15 * time.Minute, + TombstoneTTLGranularity: 30 * time.Second, } // Increase our reap interval to 3 days instead of 24h. From 4430f4592de8e9886f9c9a2bb37881b0d900181c Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 10 Dec 2014 17:17:29 -0800 Subject: [PATCH 04/30] consul: Adding TombstoneGC to track TTLs --- consul/tombstone_gc.go | 104 ++++++++++++++++++++++++++++++++++++ consul/tombstone_gc_test.go | 68 +++++++++++++++++++++++ 2 files changed, 172 insertions(+) create mode 100644 consul/tombstone_gc.go create mode 100644 consul/tombstone_gc_test.go diff --git a/consul/tombstone_gc.go b/consul/tombstone_gc.go new file mode 100644 index 000000000..149baa601 --- /dev/null +++ b/consul/tombstone_gc.go @@ -0,0 +1,104 @@ +package consul + +import ( + "fmt" + "sync" + "time" +) + +// TombstoneGC is used to track creation of tombstones +// so that they can be garbage collected after their TTL +// expires. The tombstones allow queries to provide monotonic +// index values within the TTL window. The GC is used to +// prevent monotonic growth in storage usage. This is a trade off +// between the length of the TTL and the storage overhead. +// +// In practice, this is required to fix the issue of delete +// visibility. When data is deleted from the KV store, the +// "latest" row can go backwards if the newest row is removed. +// The tombstones provide a way to ensure time doesn't move +// backwards within some interval. +// +type TombstoneGC struct { + ttl time.Duration + granularity time.Duration + + // expires maps the time of expiration to the highest + // tombstone value that should be expired. + expires map[time.Time]uint64 + expiresLock sync.Mutex + + // expireCh is used to stream expiration + expireCh chan uint64 +} + +// NewTombstoneGC is used to construct a new TombstoneGC given +// a TTL for tombstones and a tracking granularity. Longer TTLs +// ensure correct behavior for more time, but use more storage. +// A shorter granularity increases the number of Raft transactions +// and reduce how far past the TTL we perform GC. +func NewTombstoneGC(ttl, granularity time.Duration) (*TombstoneGC, error) { + // Sanity check the inputs + if ttl <= 0 || granularity <= 0 { + return nil, fmt.Errorf("Tombstone TTL and granularity must be positive") + } + + t := &TombstoneGC{ + ttl: ttl, + granularity: granularity, + expires: make(map[time.Time]uint64), + expireCh: make(chan uint64, 1), + } + return t, nil +} + +// ExpireCh is used to return a channel that streams the next index +// that should be expired +func (t *TombstoneGC) ExpireCh() <-chan uint64 { + return t.expireCh +} + +// Hint is used to indicate that keys at the given index have been +// deleted, and that their GC should be scheduled. +func (t *TombstoneGC) Hint(index uint64) { + expires := t.nextExpires() + + t.expiresLock.Lock() + defer t.expiresLock.Unlock() + + // Check for an existing expiration timer + existing, ok := t.expires[expires] + if ok { + // Increment the highest index to be expired at that time + if index > existing { + t.expires[expires] = index + } + return + } + + // Create new expiration time + t.expires[expires] = index + time.AfterFunc(expires.Sub(time.Now()), func() { + t.expireTime(expires) + }) +} + +// nextExpires is used to calculate the next experation time +func (t *TombstoneGC) nextExpires() time.Time { + expires := time.Now().Add(t.ttl) + remain := expires.UnixNano() % int64(t.granularity) + adj := expires.Add(t.granularity - time.Duration(remain)) + return adj +} + +// expireTime is used to expire the entries at the given time +func (t *TombstoneGC) expireTime(expires time.Time) { + // Get the maximum index and clear the entry + t.expiresLock.Lock() + index := t.expires[expires] + delete(t.expires, expires) + t.expiresLock.Unlock() + + // Notify the expires channel + t.expireCh <- index +} diff --git a/consul/tombstone_gc_test.go b/consul/tombstone_gc_test.go new file mode 100644 index 000000000..d169d611b --- /dev/null +++ b/consul/tombstone_gc_test.go @@ -0,0 +1,68 @@ +package consul + +import ( + "testing" + "time" +) + +func TestTombstoneGC_invalid(t *testing.T) { + _, err := NewTombstoneGC(0, 0) + if err == nil { + t.Fatalf("should fail") + } + + _, err = NewTombstoneGC(time.Second, 0) + if err == nil { + t.Fatalf("should fail") + } + + _, err = NewTombstoneGC(0, time.Second) + if err == nil { + t.Fatalf("should fail") + } +} + +func TestTombstoneGC(t *testing.T) { + ttl := 20 * time.Millisecond + gran := 5 * time.Millisecond + gc, err := NewTombstoneGC(ttl, gran) + if err != nil { + t.Fatalf("should fail") + } + + start := time.Now() + gc.Hint(100) + + time.Sleep(2 * gran) + start2 := time.Now() + gc.Hint(120) + gc.Hint(125) + + select { + case index := <-gc.ExpireCh(): + end := time.Now() + if end.Sub(start) < ttl { + t.Fatalf("expired early") + } + if index != 100 { + t.Fatalf("bad index: %d", index) + } + + case <-time.After(ttl * 2): + t.Fatalf("should get expiration") + } + + select { + case index := <-gc.ExpireCh(): + end := time.Now() + if end.Sub(start2) < ttl { + t.Fatalf("expired early") + } + if index != 125 { + t.Fatalf("bad index: %d", index) + } + + case <-time.After(ttl * 2): + t.Fatalf("should get expiration") + } +} From 2724061351db02648a4a46202f2318c166aad78a Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 10 Dec 2014 22:33:26 -0800 Subject: [PATCH 05/30] consul: Support reset of tombstone GC --- consul/tombstone_gc.go | 41 +++++++++++++++++++++++++++---------- consul/tombstone_gc_test.go | 18 ++++++++++++++++ 2 files changed, 48 insertions(+), 11 deletions(-) diff --git a/consul/tombstone_gc.go b/consul/tombstone_gc.go index 149baa601..e8e003255 100644 --- a/consul/tombstone_gc.go +++ b/consul/tombstone_gc.go @@ -25,13 +25,20 @@ type TombstoneGC struct { // expires maps the time of expiration to the highest // tombstone value that should be expired. - expires map[time.Time]uint64 + expires map[time.Time]*expireInterval expiresLock sync.Mutex // expireCh is used to stream expiration expireCh chan uint64 } +// expireInterval is used to track the maximum index +// to expire in a given interval with a timer +type expireInterval struct { + maxIndex uint64 + timer *time.Timer +} + // NewTombstoneGC is used to construct a new TombstoneGC given // a TTL for tombstones and a tracking granularity. Longer TTLs // ensure correct behavior for more time, but use more storage. @@ -46,7 +53,7 @@ func NewTombstoneGC(ttl, granularity time.Duration) (*TombstoneGC, error) { t := &TombstoneGC{ ttl: ttl, granularity: granularity, - expires: make(map[time.Time]uint64), + expires: make(map[time.Time]*expireInterval), expireCh: make(chan uint64, 1), } return t, nil @@ -58,6 +65,16 @@ func (t *TombstoneGC) ExpireCh() <-chan uint64 { return t.expireCh } +// Reset is used to clear the TTL timers +func (t *TombstoneGC) Reset() { + t.expiresLock.Lock() + defer t.expiresLock.Unlock() + for _, exp := range t.expires { + exp.timer.Stop() + } + t.expires = make(map[time.Time]*expireInterval) +} + // Hint is used to indicate that keys at the given index have been // deleted, and that their GC should be scheduled. func (t *TombstoneGC) Hint(index uint64) { @@ -67,20 +84,22 @@ func (t *TombstoneGC) Hint(index uint64) { defer t.expiresLock.Unlock() // Check for an existing expiration timer - existing, ok := t.expires[expires] + exp, ok := t.expires[expires] if ok { // Increment the highest index to be expired at that time - if index > existing { - t.expires[expires] = index + if index > exp.maxIndex { + exp.maxIndex = index } return } // Create new expiration time - t.expires[expires] = index - time.AfterFunc(expires.Sub(time.Now()), func() { - t.expireTime(expires) - }) + t.expires[expires] = &expireInterval{ + maxIndex: index, + timer: time.AfterFunc(expires.Sub(time.Now()), func() { + t.expireTime(expires) + }), + } } // nextExpires is used to calculate the next experation time @@ -95,10 +114,10 @@ func (t *TombstoneGC) nextExpires() time.Time { func (t *TombstoneGC) expireTime(expires time.Time) { // Get the maximum index and clear the entry t.expiresLock.Lock() - index := t.expires[expires] + exp := t.expires[expires] delete(t.expires, expires) t.expiresLock.Unlock() // Notify the expires channel - t.expireCh <- index + t.expireCh <- exp.maxIndex } diff --git a/consul/tombstone_gc_test.go b/consul/tombstone_gc_test.go index d169d611b..6f8746734 100644 --- a/consul/tombstone_gc_test.go +++ b/consul/tombstone_gc_test.go @@ -66,3 +66,21 @@ func TestTombstoneGC(t *testing.T) { t.Fatalf("should get expiration") } } + +func TestTombstoneGC_Expire(t *testing.T) { + ttl := 10 * time.Millisecond + gran := 5 * time.Millisecond + gc, err := NewTombstoneGC(ttl, gran) + if err != nil { + t.Fatalf("should fail") + } + + gc.Hint(100) + gc.Reset() + + select { + case <-gc.ExpireCh(): + t.Fatalf("shoudl be reset") + case <-time.After(20 * time.Millisecond): + } +} From 1a9431847bebbed39f4af2569113dc74c646ea0a Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 10 Dec 2014 23:37:22 -0800 Subject: [PATCH 06/30] consul: Adding GetTxnLimit to MDBTable --- consul/mdb_table.go | 42 ++++++++++++++++----- consul/mdb_table_test.go | 80 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 111 insertions(+), 11 deletions(-) diff --git a/consul/mdb_table.go b/consul/mdb_table.go index 23856fc5f..37eb52503 100644 --- a/consul/mdb_table.go +++ b/consul/mdb_table.go @@ -389,10 +389,32 @@ func (t *MDBTable) GetTxn(tx *MDBTxn, index string, parts ...string) ([]interfac // Accumulate the results var results []interface{} - err = idx.iterate(tx, key, func(encRowId, res []byte) bool { + err = idx.iterate(tx, key, func(encRowId, res []byte) (bool, bool) { obj := t.Decoder(res) results = append(results, obj) - return false + return false, false + }) + + return results, err +} + +// GetTxnLimit is like GetTxn limits the maximum number of +// rows it will return +func (t *MDBTable) GetTxnLimit(tx *MDBTxn, limit int, index string, parts ...string) ([]interface{}, error) { + // Get the associated index + idx, key, err := t.getIndex(index, parts) + if err != nil { + return nil, err + } + + // Accumulate the results + var results []interface{} + num := 0 + err = idx.iterate(tx, key, func(encRowId, res []byte) (bool, bool) { + num++ + obj := t.Decoder(res) + results = append(results, obj) + return false, num == limit }) return results, err @@ -412,10 +434,10 @@ func (t *MDBTable) StreamTxn(stream chan<- interface{}, tx *MDBTxn, index string } // Stream the results - err = idx.iterate(tx, key, func(encRowId, res []byte) bool { + err = idx.iterate(tx, key, func(encRowId, res []byte) (bool, bool) { obj := t.Decoder(res) stream <- obj - return false + return false, false }) return err @@ -508,7 +530,7 @@ func (t *MDBTable) innerDeleteWithIndex(tx *MDBTxn, idx *MDBIndex, key []byte) ( }() // Delete everything as we iterate - err = idx.iterate(tx, key, func(encRowId, res []byte) bool { + err = idx.iterate(tx, key, func(encRowId, res []byte) (bool, bool) { // Get the object obj := t.Decoder(res) @@ -542,7 +564,7 @@ func (t *MDBTable) innerDeleteWithIndex(tx *MDBTxn, idx *MDBIndex, key []byte) ( // Delete the object num++ - return true + return true, false }) if err != nil { return 0, err @@ -644,7 +666,7 @@ func (i *MDBIndex) keyFromParts(parts ...string) []byte { // and invoking the cb with each row. We dereference the rowid, // and only return the object row func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte, - cb func(encRowId, res []byte) bool) error { + cb func(encRowId, res []byte) (bool, bool)) error { table := tx.dbis[i.table.Name] // If virtual, use the correct DBI @@ -667,8 +689,9 @@ func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte, var key, encRowId, objBytes []byte first := true + shouldStop := false shouldDelete := false - for { + for !shouldStop { if first && len(prefix) > 0 { first = false key, encRowId, err = cursor.Get(prefix, mdb.SET_RANGE) @@ -708,7 +731,8 @@ func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte, } // Invoke the cb - if shouldDelete = cb(encRowId, objBytes); shouldDelete { + shouldDelete, shouldStop = cb(encRowId, objBytes) + if shouldDelete { if err := cursor.Del(0); err != nil { return fmt.Errorf("delete failed: %v", err) } diff --git a/consul/mdb_table_test.go b/consul/mdb_table_test.go index e70c9131f..73e4001d1 100644 --- a/consul/mdb_table_test.go +++ b/consul/mdb_table_test.go @@ -2,12 +2,13 @@ package consul import ( "bytes" - "github.com/armon/gomdb" - "github.com/hashicorp/go-msgpack/codec" "io/ioutil" "os" "reflect" "testing" + + "github.com/armon/gomdb" + "github.com/hashicorp/go-msgpack/codec" ) type MockData struct { @@ -970,3 +971,78 @@ func TestMDBTableStream(t *testing.T) { t.Fatalf("bad index: %d", idx) } } + +func TestMDBTableGetTxnLimit(t *testing.T) { + dir, env := testMDBEnv(t) + defer os.RemoveAll(dir) + defer env.Close() + + table := &MDBTable{ + Env: env, + Name: "test", + Indexes: map[string]*MDBIndex{ + "id": &MDBIndex{ + Unique: true, + Fields: []string{"Key"}, + }, + "name": &MDBIndex{ + Fields: []string{"First", "Last"}, + }, + "country": &MDBIndex{ + Fields: []string{"Country"}, + }, + }, + Encoder: MockEncoder, + Decoder: MockDecoder, + } + if err := table.Init(); err != nil { + t.Fatalf("err: %v", err) + } + + objs := []*MockData{ + &MockData{ + Key: "1", + First: "Kevin", + Last: "Smith", + Country: "USA", + }, + &MockData{ + Key: "2", + First: "Kevin", + Last: "Wang", + Country: "USA", + }, + &MockData{ + Key: "3", + First: "Bernardo", + Last: "Torres", + Country: "Mexico", + }, + } + + // Insert some mock objects + for idx, obj := range objs { + if err := table.Insert(obj); err != nil { + t.Fatalf("err: %v", err) + } + if err := table.SetLastIndex(uint64(idx + 1)); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Start a readonly txn + tx, err := table.StartTxn(true, nil) + if err != nil { + panic(err) + } + defer tx.Abort() + + // Verify with some gets + res, err := table.GetTxnLimit(tx, 2, "id") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) != 2 { + t.Fatalf("expect 2 result: %#v", res) + } +} From 4da4e322a361652802bc78aefdd0e5c325645056 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 10 Dec 2014 23:39:57 -0800 Subject: [PATCH 07/30] consul: Fixing tombstone creation and hinting of GC --- consul/state_store.go | 79 ++++++++++++++++++++------------------ consul/state_store_test.go | 38 +++++++++++++++++- 2 files changed, 79 insertions(+), 38 deletions(-) diff --git a/consul/state_store.go b/consul/state_store.go index fcdb54649..e3b43b801 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -78,6 +78,10 @@ type StateStore struct { // is never questioned. lockDelay map[string]time.Time lockDelayLock sync.RWMutex + + // GC is when we create tombstones to track their time-to-live. + // The GC is consumed upstream to manage clearing of tombstones. + gc *TombstoneGC } // StateSnapshot is used to provide a point-in-time snapshot @@ -104,18 +108,18 @@ func (s *StateSnapshot) Close() error { } // NewStateStore is used to create a new state store -func NewStateStore(logOutput io.Writer) (*StateStore, error) { +func NewStateStore(gc *TombstoneGC, logOutput io.Writer) (*StateStore, error) { // Create a new temp dir path, err := ioutil.TempDir("", "consul") if err != nil { return nil, err } - return NewStateStorePath(path, logOutput) + return NewStateStorePath(gc, path, logOutput) } // NewStateStorePath is used to create a new state store at a given path // The path is cleared on closing. -func NewStateStorePath(path string, logOutput io.Writer) (*StateStore, error) { +func NewStateStorePath(gc *TombstoneGC, path string, logOutput io.Writer) (*StateStore, error) { // Open the env env, err := mdb.NewEnv() if err != nil { @@ -1203,7 +1207,7 @@ func (s *StateStore) KVSDeleteTree(index uint64, prefix string) error { // kvsDeleteWithIndex does a delete with either the id or id_prefix func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts ...string) error { - tx, err := s.kvsTable.StartTxn(false, nil) + tx, err := s.tables.StartTxn(false) if err != nil { return err } @@ -1213,50 +1217,51 @@ func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts . // kvsDeleteWithIndexTxn does a delete within an existing transaction func (s *StateStore) kvsDeleteWithIndexTxn(index uint64, tx *MDBTxn, tableIndex string, parts ...string) error { - // Create the appropriate tombstone entries - streamCh := make(chan interface{}, 128) - doneCh := make(chan struct{}) - var tombstoneErr error - go s.kvsTombstoneEntries(index, tx, streamCh, doneCh, &tombstoneErr) - err := s.kvsTable.StreamTxn(streamCh, tx, tableIndex, parts...) - <-doneCh - if err != nil { - return err - } - if tombstoneErr != nil { - return tombstoneErr - } + num := 0 + for { + // Get some number of entries to delete + pairs, err := s.kvsTable.GetTxnLimit(tx, 128, tableIndex, parts...) + if err != nil { + return err + } - num, err := s.kvsTable.DeleteTxn(tx, tableIndex, parts...) - if err != nil { - return err + // Create the tombstones and delete + for _, raw := range pairs { + ent := raw.(*structs.DirEntry) + ent.ModifyIndex = index // Update the index + ent.Value = nil // Reduce storage required + ent.Session = "" + if err := s.tombstoneTable.InsertTxn(tx, ent); err != nil { + return err + } + if _, err := s.kvsTable.DeleteTxn(tx, "id", ent.Key); err != nil { + return err + } + } + + // Increment the total number + num += len(pairs) + if len(pairs) == 0 { + break + } } if num > 0 { if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { return err } - tx.Defer(func() { s.watch[s.kvsTable].Notify() }) + tx.Defer(func() { + s.watch[s.kvsTable].Notify() + if s.gc != nil { + // If GC is configured, then we hint that this index + // required expiration. + s.gc.Hint(index) + } + }) } return tx.Commit() } -// kvsTombstoneEntries is used to consume KVS entries over a stream -// and commit them as tombstones within a given transaction and index. -func (s *StateStore) kvsTombstoneEntries(index uint64, tx *MDBTxn, streamCh chan interface{}, doneCh chan struct{}, errOut *error) { - defer close(doneCh) - for raw := range streamCh { - ent := raw.(*structs.DirEntry) - ent.ModifyIndex = index - ent.Value = nil - ent.Session = "" - if err := s.tombstoneTable.InsertTxn(tx, ent); err != nil { - s.logger.Printf("[ERR] consul.state: Failed to create tombstone for %s: %s", ent.Key, err) - *errOut = err - } - } -} - // KVSCheckAndSet is used to perform an atomic check-and-set func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, error) { return s.kvsSet(index, d, kvCAS) diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 888b4a343..621734ef3 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -11,7 +11,7 @@ import ( ) func testStateStore() (*StateStore, error) { - return NewStateStore(os.Stderr) + return NewStateStore(nil, os.Stderr) } func TestEnsureRegistration(t *testing.T) { @@ -1413,6 +1413,14 @@ func TestKVSDelete(t *testing.T) { } defer store.Close() + ttl := 10 * time.Millisecond + gran := 5 * time.Millisecond + gc, err := NewTombstoneGC(ttl, gran) + if err != nil { + t.Fatalf("err: %v", err) + } + store.gc = gc + // Create the entry d := &structs.DirEntry{Key: "/foo", Flags: 42, Value: []byte("test")} if err := store.KVSSet(1000, d); err != nil { @@ -1435,6 +1443,16 @@ func TestKVSDelete(t *testing.T) { if d != nil { t.Fatalf("bad: %v", d) } + + // Check that we get a delete + select { + case idx := <-gc.ExpireCh(): + if idx != 1020 { + t.Fatalf("bad %d", idx) + } + case <-time.After(20 * time.Millisecond): + t.Fatalf("should expire") + } } func TestKVSCheckAndSet(t *testing.T) { @@ -1737,6 +1755,14 @@ func TestKVSDeleteTree(t *testing.T) { } defer store.Close() + ttl := 10 * time.Millisecond + gran := 5 * time.Millisecond + gc, err := NewTombstoneGC(ttl, gran) + if err != nil { + t.Fatalf("err: %v", err) + } + store.gc = gc + // Should not exist err = store.KVSDeleteTree(1000, "/web") if err != nil { @@ -1774,6 +1800,16 @@ func TestKVSDeleteTree(t *testing.T) { if len(ents) != 0 { t.Fatalf("bad: %v", ents) } + + // Check that we get a delete + select { + case idx := <-gc.ExpireCh(): + if idx != 1010 { + t.Fatalf("bad %d", idx) + } + case <-time.After(20 * time.Millisecond): + t.Fatalf("should expire") + } } func TestSessionCreate(t *testing.T) { From ae69cbca7b983856ab9e148bd1c51a6cf20cc6f7 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 10 Dec 2014 23:49:23 -0800 Subject: [PATCH 08/30] consul: Fixing accidental commit of transaction --- consul/state_store.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/consul/state_store.go b/consul/state_store.go index e3b43b801..10883dd33 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -1212,7 +1212,10 @@ func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts . return err } defer tx.Abort() - return s.kvsDeleteWithIndexTxn(index, tx, tableIndex, parts...) + if err := s.kvsDeleteWithIndexTxn(index, tx, tableIndex, parts...); err != nil { + return err + } + return tx.Commit() } // kvsDeleteWithIndexTxn does a delete within an existing transaction @@ -1259,7 +1262,7 @@ func (s *StateStore) kvsDeleteWithIndexTxn(index uint64, tx *MDBTxn, tableIndex } }) } - return tx.Commit() + return nil } // KVSCheckAndSet is used to perform an atomic check-and-set From 71c2c1468d539562001ba586e26dd50a843cedfc Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 10 Dec 2014 23:49:44 -0800 Subject: [PATCH 09/30] consul: Thread Tombstone GC through --- consul/fsm.go | 8 +++++--- consul/fsm_test.go | 30 +++++++++++++++--------------- consul/issue_test.go | 2 +- consul/server.go | 13 ++++++++++++- 4 files changed, 33 insertions(+), 20 deletions(-) diff --git a/consul/fsm.go b/consul/fsm.go index 903137044..875eb9ae3 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -21,6 +21,7 @@ type consulFSM struct { logger *log.Logger path string state *StateStore + gc *TombstoneGC } // consulSnapshot is used to provide a snapshot of the current @@ -38,7 +39,7 @@ type snapshotHeader struct { } // NewFSMPath is used to construct a new FSM with a blank state -func NewFSM(path string, logOutput io.Writer) (*consulFSM, error) { +func NewFSM(gc *TombstoneGC, path string, logOutput io.Writer) (*consulFSM, error) { // Create a temporary path for the state store tmpPath, err := ioutil.TempDir(path, "state") if err != nil { @@ -46,7 +47,7 @@ func NewFSM(path string, logOutput io.Writer) (*consulFSM, error) { } // Create a state store - state, err := NewStateStorePath(tmpPath, logOutput) + state, err := NewStateStorePath(gc, tmpPath, logOutput) if err != nil { return nil, err } @@ -56,6 +57,7 @@ func NewFSM(path string, logOutput io.Writer) (*consulFSM, error) { logger: log.New(logOutput, "", log.LstdFlags), path: path, state: state, + gc: gc, } return fsm, nil } @@ -236,7 +238,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { } // Create a new state store - state, err := NewStateStorePath(tmpPath, c.logOutput) + state, err := NewStateStorePath(c.gc, tmpPath, c.logOutput) if err != nil { return err } diff --git a/consul/fsm_test.go b/consul/fsm_test.go index db01580e4..9db2c8c3a 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -42,7 +42,7 @@ func TestFSM_RegisterNode(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -82,7 +82,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -139,7 +139,7 @@ func TestFSM_DeregisterService(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -198,7 +198,7 @@ func TestFSM_DeregisterCheck(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -257,7 +257,7 @@ func TestFSM_DeregisterNode(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -328,7 +328,7 @@ func TestFSM_SnapshotRestore(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -372,7 +372,7 @@ func TestFSM_SnapshotRestore(t *testing.T) { } // Try to restore on a new FSM - fsm2, err := NewFSM(path, os.Stderr) + fsm2, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -453,7 +453,7 @@ func TestFSM_KVSSet(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -492,7 +492,7 @@ func TestFSM_KVSDelete(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -542,7 +542,7 @@ func TestFSM_KVSDeleteTree(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -593,7 +593,7 @@ func TestFSM_KVSCheckAndSet(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -654,7 +654,7 @@ func TestFSM_SessionCreate_Destroy(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -738,7 +738,7 @@ func TestFSM_KVSLock(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -787,7 +787,7 @@ func TestFSM_KVSUnlock(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -854,7 +854,7 @@ func TestFSM_ACL_Set_Delete(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } diff --git a/consul/issue_test.go b/consul/issue_test.go index 77bca7ce9..5676c6a1d 100644 --- a/consul/issue_test.go +++ b/consul/issue_test.go @@ -15,7 +15,7 @@ func TestHealthCheckRace(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(path, os.Stderr) + fsm, err := NewFSM(nil, path, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } diff --git a/consul/server.go b/consul/server.go index 9cdfa0b53..88096a175 100644 --- a/consul/server.go +++ b/consul/server.go @@ -134,6 +134,10 @@ type Server struct { sessionTimers map[string]*time.Timer sessionTimersLock sync.Mutex + // tombstoneGC is used to track the pending GC invocations + // for the KV tombstones + tombstoneGC *TombstoneGC + shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex @@ -189,6 +193,12 @@ func NewServer(config *Config) (*Server, error) { // Create a logger logger := log.New(config.LogOutput, "", log.LstdFlags) + // Create the tombstone GC + gc, err := NewTombstoneGC(config.TombstoneTTL, config.TombstoneTTLGranularity) + if err != nil { + return nil, err + } + // Create server s := &Server{ config: config, @@ -201,6 +211,7 @@ func NewServer(config *Config) (*Server, error) { remoteConsuls: make(map[string][]*serverParts), rpcServer: rpc.NewServer(), rpcTLS: incomingTLS, + tombstoneGC: gc, shutdownCh: make(chan struct{}), } @@ -320,7 +331,7 @@ func (s *Server) setupRaft() error { // Create the FSM var err error - s.fsm, err = NewFSM(statePath, s.config.LogOutput) + s.fsm, err = NewFSM(s.tombstoneGC, statePath, s.config.LogOutput) if err != nil { return err } From fb8f7fd929673d544096d91eefdf0bedd3978ce5 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 15 Dec 2014 14:22:32 -0800 Subject: [PATCH 10/30] consul: Adding PendingExpiration --- consul/tombstone_gc.go | 7 +++++++ consul/tombstone_gc_test.go | 16 ++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/consul/tombstone_gc.go b/consul/tombstone_gc.go index e8e003255..0d0da74ad 100644 --- a/consul/tombstone_gc.go +++ b/consul/tombstone_gc.go @@ -102,6 +102,13 @@ func (t *TombstoneGC) Hint(index uint64) { } } +// PendingExpiration is used to check if any expirations are pending +func (t *TombstoneGC) PendingExpiration() bool { + t.expiresLock.Lock() + defer t.expiresLock.Unlock() + return len(t.expires) > 0 +} + // nextExpires is used to calculate the next experation time func (t *TombstoneGC) nextExpires() time.Time { expires := time.Now().Add(t.ttl) diff --git a/consul/tombstone_gc_test.go b/consul/tombstone_gc_test.go index 6f8746734..6a425a596 100644 --- a/consul/tombstone_gc_test.go +++ b/consul/tombstone_gc_test.go @@ -30,6 +30,10 @@ func TestTombstoneGC(t *testing.T) { t.Fatalf("should fail") } + if gc.PendingExpiration() { + t.Fatalf("should not be pending") + } + start := time.Now() gc.Hint(100) @@ -38,6 +42,10 @@ func TestTombstoneGC(t *testing.T) { gc.Hint(120) gc.Hint(125) + if !gc.PendingExpiration() { + t.Fatalf("should be pending") + } + select { case index := <-gc.ExpireCh(): end := time.Now() @@ -75,9 +83,17 @@ func TestTombstoneGC_Expire(t *testing.T) { t.Fatalf("should fail") } + if gc.PendingExpiration() { + t.Fatalf("should not be pending") + } + gc.Hint(100) gc.Reset() + if gc.PendingExpiration() { + t.Fatalf("should not be pending") + } + select { case <-gc.ExpireCh(): t.Fatalf("shoudl be reset") From 9f30ffbf9a8ce4d4bfa5c2eda7063addb81c4721 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 15 Dec 2014 14:37:49 -0800 Subject: [PATCH 11/30] consul: Leader should reset the tombstone GC clock --- consul/leader.go | 8 +++++ consul/leader_test.go | 75 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/consul/leader.go b/consul/leader.go index 7c9484b8f..5c7c3ef34 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -121,6 +121,14 @@ WAIT: // previously inflight transactions have been commited and that our // state is up-to-date. func (s *Server) establishLeadership() error { + // Hint the tombstone expiration timer. When we freshly establish leadership + // we become the authoritative timer, and so we need to start the clock + // on any pending GC events. + s.tombstoneGC.Reset() + lastIndex := s.raft.LastIndex() + s.tombstoneGC.Hint(lastIndex) + s.logger.Printf("[DEBUG] consul: reset tombstone GC to index %d", lastIndex) + // Setup ACLs if we are the leader and need to if err := s.initializeACL(); err != nil { s.logger.Printf("[ERR] consul: ACL initialization failed: %v", err) diff --git a/consul/leader_test.go b/consul/leader_test.go index 1bd910858..b41ac9755 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -436,3 +436,78 @@ func TestLeader_MultiBootstrap(t *testing.T) { } } } + +func TestLeader_TombstoneGC_Reset(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + servers := []*Server{s1, s2, s3} + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + if _, err := s3.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + for _, s := range servers { + testutil.WaitForResult(func() (bool, error) { + peers, _ := s.raftPeers.Peers() + return len(peers) == 3, nil + }, func(err error) { + t.Fatalf("should have 3 peers") + }) + } + + var leader *Server + for _, s := range servers { + if s.IsLeader() { + leader = s + break + } + } + if leader == nil { + t.Fatalf("Should have a leader") + } + + // Check that the leader has a pending GC expiration + if !leader.tombstoneGC.PendingExpiration() { + t.Fatalf("should have pending expiration") + } + + // Kill the leader + leader.Shutdown() + time.Sleep(100 * time.Millisecond) + + // Wait for a new leader + leader = nil + testutil.WaitForResult(func() (bool, error) { + for _, s := range servers { + if s.IsLeader() { + leader = s + return true, nil + } + } + return false, nil + }, func(err error) { + t.Fatalf("should have leader") + }) + + // Check that the new leader has a pending GC expiration + testutil.WaitForResult(func() (bool, error) { + return leader.tombstoneGC.PendingExpiration(), nil + }, func(err error) { + t.Fatalf("should have pending expiration") + }) +} From 02e984e4c4106a954c31f2ca8e1e698e15fa906a Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 15 Dec 2014 15:01:04 -0800 Subject: [PATCH 12/30] consul: Adding new request to reap tombstones --- consul/structs/structs.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 2072780f3..01a4f455f 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -23,6 +23,7 @@ const ( KVSRequestType SessionRequestType ACLRequestType + TombstoneReapRequestType ) const ( @@ -531,6 +532,17 @@ type EventFireResponse struct { QueryMeta } +// TombstoneReapRequest is used to trigger a reaping of the tombstones +type TombstoneReapRequest struct { + Datacenter string + ReapIndex uint64 + WriteRequest +} + +func (r *TombstoneReapRequest) RequestDatacenter() string { + return r.Datacenter +} + // msgpackHandle is a shared handle for encoding/decoding of structs var msgpackHandle = &codec.MsgpackHandle{} From 8681d913ba469d4dc1fe9fc9adfcd49e00abc4c9 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 15 Dec 2014 15:04:21 -0800 Subject: [PATCH 13/30] consul: Generate a raft operation to reap tombstones --- consul/fsm.go | 10 ++++++++++ consul/leader.go | 21 +++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/consul/fsm.go b/consul/fsm.go index 875eb9ae3..a105d2716 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -85,6 +85,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} { return c.applySessionOperation(buf[1:], log.Index) case structs.ACLRequestType: return c.applyACLOperation(buf[1:], log.Index) + case structs.TombstoneReapRequestType: + return c.applyTombstoneReapOperation(buf[1:], log.Index) default: panic(fmt.Errorf("failed to apply request: %#v", buf)) } @@ -215,6 +217,14 @@ func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} { } } +func (c *consulFSM) applyTombstoneReapOperation(buf []byte, index uint64) interface{} { + var req structs.TombstoneReapRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + return nil +} + func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) { defer func(start time.Time) { c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start)) diff --git a/consul/leader.go b/consul/leader.go index 5c7c3ef34..a280fa1d4 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -112,6 +112,8 @@ WAIT: goto RECONCILE case member := <-reconcileCh: s.reconcileMember(member) + case index := <-s.tombstoneGC.ExpireCh(): + go s.reapTombstones(index) } } } @@ -526,3 +528,22 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error { } return nil } + +// reapTombstones is invoked by the current leader to manage garbage +// collection of tombstones. When a key is deleted, we trigger a tombstone +// GC clock. Once the expiration is reached, this routine is invoked +// to clear all tombstones before this index. This must be replicated +// through Raft to ensure consistency. We do this outside the leader loop +// to avoid blocking. +func (s *Server) reapTombstones(index uint64) { + req := structs.TombstoneReapRequest{ + Datacenter: s.config.Datacenter, + ReapIndex: index, + WriteRequest: structs.WriteRequest{Token: s.config.ACLToken}, + } + _, err := s.raftApply(structs.TombstoneReapRequestType, &req) + if err != nil { + s.logger.Printf("[ERR] consul: failed to reap tombstones up to %d: %v", + index, err) + } +} From 0c9cbdb3d1eac595ee4496f9467278b5adaca890 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 15 Dec 2014 15:26:46 -0800 Subject: [PATCH 14/30] consul: TombstoneReapRequestType -> TombstoneRequestType --- consul/structs/structs.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 01a4f455f..4b9263c54 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -23,7 +23,7 @@ const ( KVSRequestType SessionRequestType ACLRequestType - TombstoneReapRequestType + TombstoneRequestType ) const ( @@ -532,14 +532,21 @@ type EventFireResponse struct { QueryMeta } -// TombstoneReapRequest is used to trigger a reaping of the tombstones -type TombstoneReapRequest struct { +type TombstoneOp string + +const ( + TombstoneReap TombstoneOp = "reap" +) + +// TombstoneRequest is used to trigger a reaping of the tombstones +type TombstoneRequest struct { Datacenter string + Op TombstoneOp ReapIndex uint64 WriteRequest } -func (r *TombstoneReapRequest) RequestDatacenter() string { +func (r *TombstoneRequest) RequestDatacenter() string { return r.Datacenter } From 9152fae1098a61ec909f71772f52fe863f510f94 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 15 Dec 2014 15:28:56 -0800 Subject: [PATCH 15/30] consul: First pass at tombstone reaping --- consul/fsm.go | 16 ++++++--- consul/leader.go | 5 +-- consul/state_store.go | 78 +++++++++++++++++++++++++++++++++++++++---- 3 files changed, 86 insertions(+), 13 deletions(-) diff --git a/consul/fsm.go b/consul/fsm.go index a105d2716..0332cf677 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -85,8 +85,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} { return c.applySessionOperation(buf[1:], log.Index) case structs.ACLRequestType: return c.applyACLOperation(buf[1:], log.Index) - case structs.TombstoneReapRequestType: - return c.applyTombstoneReapOperation(buf[1:], log.Index) + case structs.TombstoneRequestType: + return c.applyTombstoneOperation(buf[1:], log.Index) default: panic(fmt.Errorf("failed to apply request: %#v", buf)) } @@ -217,12 +217,18 @@ func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} { } } -func (c *consulFSM) applyTombstoneReapOperation(buf []byte, index uint64) interface{} { - var req structs.TombstoneReapRequest +func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{} { + var req structs.TombstoneRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } - return nil + switch req.Op { + case structs.TombstoneReap: + return c.state.ReapTombstones(req.ReapIndex) + default: + c.logger.Printf("[WARN] consul.fsm: Invalid Tombstone operation '%s'", req.Op) + return fmt.Errorf("Invalid Tombstone operation '%s'", req.Op) + } } func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) { diff --git a/consul/leader.go b/consul/leader.go index a280fa1d4..df0c46c28 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -536,12 +536,13 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error { // through Raft to ensure consistency. We do this outside the leader loop // to avoid blocking. func (s *Server) reapTombstones(index uint64) { - req := structs.TombstoneReapRequest{ + req := structs.TombstoneRequest{ Datacenter: s.config.Datacenter, + Op: structs.TombstoneReap, ReapIndex: index, WriteRequest: structs.WriteRequest{Token: s.config.ACLToken}, } - _, err := s.raftApply(structs.TombstoneReapRequestType, &req) + _, err := s.raftApply(structs.TombstoneRequestType, &req) if err != nil { s.logger.Printf("[ERR] consul: failed to reap tombstones up to %d: %v", index, err) diff --git a/consul/state_store.go b/consul/state_store.go index 10883dd33..2620737e4 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -296,12 +296,6 @@ func (s *StateStore) initialize() error { Unique: true, Fields: []string{"Key"}, }, - "id_prefix": &MDBIndex{ - Virtual: true, - RealIndex: "id", - Fields: []string{"Key"}, - IdxFunc: DefaultIndexPrefixFunc, - }, }, Decoder: func(buf []byte) interface{} { out := new(structs.DirEntry) @@ -1386,6 +1380,71 @@ func (s *StateStore) kvsSet( return true, tx.Commit() } +// ReapTombstones is used to delete all the tombstones with a ModifyTime +// less than or equal to the given index. This is used to prevent unbounded +// storage growth of the tombstones. +func (s *StateStore) ReapTombstones(index uint64) error { + tx, err := s.tombstoneTable.StartTxn(false, nil) + if err != nil { + return fmt.Errorf("failed to start txn: %v", err) + } + defer tx.Abort() + + // Scan the tombstone table for all the entries that are + // eligble for GC. This could be improved by indexing on + // ModifyTime and doing a less-than-equals scan, however + // we don't currently support numeric indexes internally. + // Luckily, this is a low frequency operation. + var toDelete []string + streamCh := make(chan interface{}, 128) + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + for raw := range streamCh { + ent := raw.(*structs.DirEntry) + if ent.ModifyIndex <= index { + toDelete = append(toDelete, ent.Key) + } + } + }() + if err := s.tombstoneTable.StreamTxn(streamCh, tx, "id"); err != nil { + s.logger.Printf("[ERR] consul.state: Failed to scan tombstones: %v", err) + return fmt.Errorf("failed to scan tombstones: %v", err) + } + + // Delete each tombstone + if len(toDelete) > 0 { + s.logger.Printf("[DEBUG] consul.state: Reaping %d tombstones", len(toDelete)) + } + for _, key := range toDelete { + num, err := s.tombstoneTable.DeleteTxn(tx, "id", key) + if err != nil { + s.logger.Printf("[ERR] consul.state: Failed to delete tombstone: %v", err) + return fmt.Errorf("failed to delete tombstone: %v", err) + } + if num != 1 { + return fmt.Errorf("failed to delete tombstone '%s'", key) + } + } + return tx.Commit() +} + +// TombstoneRestore is used to restore a tombstone. +// It should only be used when doing a restore. +func (s *StateStore) TombstoneRestore(d *structs.DirEntry) error { + // Start a new txn + tx, err := s.tombstoneTable.StartTxn(false, nil) + if err != nil { + return err + } + defer tx.Abort() + + if err := s.tombstoneTable.InsertTxn(tx, d); err != nil { + return err + } + return tx.Commit() +} + // SessionCreate is used to create a new session. The // ID will be populated on a successful return func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error { @@ -1852,6 +1911,13 @@ func (s *StateSnapshot) KVSDump(stream chan<- interface{}) error { return s.store.kvsTable.StreamTxn(stream, s.tx, "id") } +// TombstoneDump is used to dump all tombstone entries. It takes a channel and streams +// back *struct.DirEntry objects. This will block and should be invoked +// in a goroutine. +func (s *StateSnapshot) TombstoneDump(stream chan<- interface{}) error { + return s.store.tombstoneTable.StreamTxn(stream, s.tx, "id") +} + // SessionList is used to list all the open sessions func (s *StateSnapshot) SessionList() ([]*structs.Session, error) { res, err := s.store.sessionTable.GetTxn(s.tx, "id") From f9d322f34623a291b9e090dea8da948451ccbd57 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 15 Dec 2014 15:31:13 -0800 Subject: [PATCH 16/30] consul: Persist tombstones --- consul/fsm.go | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/consul/fsm.go b/consul/fsm.go index 0332cf677..b6bde95e6 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -317,6 +317,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { return err } + case structs.TombstoneRequestType: + var req structs.DirEntry + if err := dec.Decode(&req); err != nil { + return err + } + if err := c.state.TombstoneRestore(&req); err != nil { + return err + } + default: return fmt.Errorf("Unrecognized msg type: %v", msgType) } @@ -357,6 +366,11 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { sink.Cancel() return err } + + if err := s.persistTombstones(sink, encoder); err != nil { + sink.Cancel() + return err + } return nil } @@ -462,6 +476,33 @@ func (s *consulSnapshot) persistKV(sink raft.SnapshotSink, } } +func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + streamCh := make(chan interface{}, 256) + errorCh := make(chan error) + go func() { + if err := s.state.TombstoneDump(streamCh); err != nil { + errorCh <- err + } + }() + + for { + select { + case raw := <-streamCh: + if raw == nil { + return nil + } + sink.Write([]byte{byte(structs.TombstoneRequestType)}) + if err := encoder.Encode(raw); err != nil { + return err + } + + case err := <-errorCh: + return err + } + } +} + func (s *consulSnapshot) Release() { s.state.Close() } From bba573dfbcff68ab7ea58ed5b490e3b26fe391df Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 18 Dec 2014 14:48:43 -0800 Subject: [PATCH 17/30] consul: Test tombstone creation --- consul/state_store.go | 6 ++++++ consul/state_store_test.go | 23 +++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/consul/state_store.go b/consul/state_store.go index 2620737e4..6a90af3b4 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -296,6 +296,12 @@ func (s *StateStore) initialize() error { Unique: true, Fields: []string{"Key"}, }, + "id_prefix": &MDBIndex{ + Virtual: true, + RealIndex: "id", + Fields: []string{"Key"}, + IdxFunc: DefaultIndexPrefixFunc, + }, }, Decoder: func(buf []byte) interface{} { out := new(structs.DirEntry) diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 621734ef3..0e5781ce3 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -1444,6 +1444,15 @@ func TestKVSDelete(t *testing.T) { t.Fatalf("bad: %v", d) } + // Check tombstone exists + _, res, err := store.tombstoneTable.Get("id", "/foo") + if err != nil { + t.Fatalf("err: %v", err) + } + if res == nil || res[0].(*structs.DirEntry).ModifyIndex != 1020 { + t.Fatalf("bad: %#v", d) + } + // Check that we get a delete select { case idx := <-gc.ExpireCh(): @@ -1801,6 +1810,20 @@ func TestKVSDeleteTree(t *testing.T) { t.Fatalf("bad: %v", ents) } + // Check tombstones exists + _, res, err := store.tombstoneTable.Get("id_prefix", "/web") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) != 3 { + t.Fatalf("bad: %#v", d) + } + for _, r := range res { + if r.(*structs.DirEntry).ModifyIndex != 1010 { + t.Fatalf("bad: %#v", r) + } + } + // Check that we get a delete select { case idx := <-gc.ExpireCh(): From 41886c6af599404203e49fb0137d3bba0d1b0672 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 18 Dec 2014 14:57:34 -0800 Subject: [PATCH 18/30] consul: Testing tombstone reaping --- consul/state_store.go | 1 + consul/state_store_test.go | 90 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/consul/state_store.go b/consul/state_store.go index 6a90af3b4..b60a4419d 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -1417,6 +1417,7 @@ func (s *StateStore) ReapTombstones(index uint64) error { s.logger.Printf("[ERR] consul.state: Failed to scan tombstones: %v", err) return fmt.Errorf("failed to scan tombstones: %v", err) } + <-doneCh // Delete each tombstone if len(toDelete) > 0 { diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 0e5781ce3..ea63f4475 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -1835,6 +1835,96 @@ func TestKVSDeleteTree(t *testing.T) { } } +func TestReapTombstones(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + ttl := 10 * time.Millisecond + gran := 5 * time.Millisecond + gc, err := NewTombstoneGC(ttl, gran) + if err != nil { + t.Fatalf("err: %v", err) + } + store.gc = gc + + // Should not exist + err = store.KVSDeleteTree(1000, "/web") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create the entries + d := &structs.DirEntry{Key: "/web/a", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/web/b", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1001, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/web/sub/c", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1002, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Nuke just a + err = store.KVSDelete(1010, "/web/a") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Nuke the web tree + err = store.KVSDeleteTree(1020, "/web") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Do a reap, should be a noop + if err := store.ReapTombstones(1000); err != nil { + t.Fatalf("err: %v", err) + } + + // Check tombstones exists + _, res, err := store.tombstoneTable.Get("id_prefix", "/web") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) != 3 { + t.Fatalf("bad: %#v", d) + } + + // Do a reap, should remove just /web/a + if err := store.ReapTombstones(1010); err != nil { + t.Fatalf("err: %v", err) + } + + // Check tombstones exists + _, res, err = store.tombstoneTable.Get("id_prefix", "/web") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) != 2 { + t.Fatalf("bad: %#v", d) + } + + // Do a reap, should remove them all + if err := store.ReapTombstones(1025); err != nil { + t.Fatalf("err: %v", err) + } + + // Check no tombstones exists + _, res, err = store.tombstoneTable.Get("id_prefix", "/web") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) != 0 { + t.Fatalf("bad: %#v", d) + } +} + func TestSessionCreate(t *testing.T) { store, err := testStateStore() if err != nil { From 91f8ff41eb7f0a4b46ea7b088a16f94e98539d39 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 18 Dec 2014 15:14:25 -0800 Subject: [PATCH 19/30] consul: Testing tombstone snapshot --- consul/state_store.go | 4 +- consul/state_store_test.go | 81 +++++++++++++++++++++++++++++++------- 2 files changed, 70 insertions(+), 15 deletions(-) diff --git a/consul/state_store.go b/consul/state_store.go index b60a4419d..529c8dbf8 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -1237,8 +1237,10 @@ func (s *StateStore) kvsDeleteWithIndexTxn(index uint64, tx *MDBTxn, tableIndex if err := s.tombstoneTable.InsertTxn(tx, ent); err != nil { return err } - if _, err := s.kvsTable.DeleteTxn(tx, "id", ent.Key); err != nil { + if num, err := s.kvsTable.DeleteTxn(tx, "id", ent.Key); err != nil { return err + } else if num != 1 { + return fmt.Errorf("Failed to delete key '%s'", ent.Key) } } diff --git a/consul/state_store_test.go b/consul/state_store_test.go index ea63f4475..9ba723d11 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -688,23 +688,32 @@ func TestStoreSnapshot(t *testing.T) { if err := store.KVSSet(15, d); err != nil { t.Fatalf("err: %v", err) } + d = &structs.DirEntry{Key: "/web/c", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(16, d); err != nil { + t.Fatalf("err: %v", err) + } + // Create a tombstone + // TODO: Change to /web/c causes failure? + if err := store.KVSDelete(17, "/web/a"); err != nil { + t.Fatalf("err: %v", err) + } // Add some sessions session := &structs.Session{ID: generateUUID(), Node: "foo"} - if err := store.SessionCreate(16, session); err != nil { + if err := store.SessionCreate(18, session); err != nil { t.Fatalf("err: %v", err) } session = &structs.Session{ID: generateUUID(), Node: "bar"} - if err := store.SessionCreate(17, session); err != nil { + if err := store.SessionCreate(19, session); err != nil { t.Fatalf("err: %v", err) } d.Session = session.ID - if ok, err := store.KVSLock(18, d); err != nil || !ok { + if ok, err := store.KVSLock(20, d); err != nil || !ok { t.Fatalf("err: %v", err) } session = &structs.Session{ID: generateUUID(), Node: "bar", TTL: "60s"} - if err := store.SessionCreate(19, session); err != nil { + if err := store.SessionCreate(21, session); err != nil { t.Fatalf("err: %v", err) } @@ -713,7 +722,7 @@ func TestStoreSnapshot(t *testing.T) { Name: "User token", Type: structs.ACLTypeClient, } - if err := store.ACLSet(20, a1); err != nil { + if err := store.ACLSet(21, a1); err != nil { t.Fatalf("err: %v", err) } @@ -722,7 +731,7 @@ func TestStoreSnapshot(t *testing.T) { Name: "User token", Type: structs.ACLTypeClient, } - if err := store.ACLSet(21, a2); err != nil { + if err := store.ACLSet(22, a2); err != nil { t.Fatalf("err: %v", err) } @@ -734,7 +743,7 @@ func TestStoreSnapshot(t *testing.T) { defer snap.Close() // Check the last nodes - if idx := snap.LastIndex(); idx != 21 { + if idx := snap.LastIndex(); idx != 22 { t.Fatalf("bad: %v", idx) } @@ -786,7 +795,29 @@ func TestStoreSnapshot(t *testing.T) { } <-doneCh if len(ents) != 2 { - t.Fatalf("missing KVS entries!") + t.Fatalf("missing KVS entries! %#v", ents) + } + + // Check we have the tombstone entries + streamCh = make(chan interface{}, 64) + doneCh = make(chan struct{}) + ents = nil + go func() { + for { + obj := <-streamCh + if obj == nil { + close(doneCh) + return + } + ents = append(ents, obj.(*structs.DirEntry)) + } + }() + if err := snap.TombstoneDump(streamCh); err != nil { + t.Fatalf("err: %v", err) + } + <-doneCh + if len(ents) != 1 { + t.Fatalf("missing tombstone entries!") } // Check there are 3 sessions @@ -818,13 +849,13 @@ func TestStoreSnapshot(t *testing.T) { } // Make some changes! - if err := store.EnsureService(22, "foo", &structs.NodeService{"db", "db", []string{"slave"}, 8000}); err != nil { + if err := store.EnsureService(23, "foo", &structs.NodeService{"db", "db", []string{"slave"}, 8000}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService(23, "bar", &structs.NodeService{"db", "db", []string{"master"}, 8000}); err != nil { + if err := store.EnsureService(24, "bar", &structs.NodeService{"db", "db", []string{"master"}, 8000}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureNode(24, structs.Node{"baz", "127.0.0.3"}); err != nil { + if err := store.EnsureNode(25, structs.Node{"baz", "127.0.0.3"}); err != nil { t.Fatalf("err: %v", err) } checkAfter := &structs.HealthCheck{ @@ -834,16 +865,16 @@ func TestStoreSnapshot(t *testing.T) { Status: structs.HealthCritical, ServiceID: "db", } - if err := store.EnsureCheck(26, checkAfter); err != nil { + if err := store.EnsureCheck(27, checkAfter); err != nil { t.Fatalf("err: %v", err) } - if err := store.KVSDelete(26, "/web/b"); err != nil { + if err := store.KVSDelete(28, "/web/b"); err != nil { t.Fatalf("err: %v", err) } // Nuke an ACL - if err := store.ACLDelete(27, a1.ID); err != nil { + if err := store.ACLDelete(29, a1.ID); err != nil { t.Fatalf("err: %v", err) } @@ -897,6 +928,28 @@ func TestStoreSnapshot(t *testing.T) { t.Fatalf("missing KVS entries!") } + // Check we have the tombstone entries + streamCh = make(chan interface{}, 64) + doneCh = make(chan struct{}) + ents = nil + go func() { + for { + obj := <-streamCh + if obj == nil { + close(doneCh) + return + } + ents = append(ents, obj.(*structs.DirEntry)) + } + }() + if err := snap.TombstoneDump(streamCh); err != nil { + t.Fatalf("err: %v", err) + } + <-doneCh + if len(ents) != 1 { + t.Fatalf("missing tombstone entries!") + } + // Check there are 3 sessions sessions, err = snap.SessionList() if err != nil { From 7736e701ca1d053de2277a450c64748fc15097aa Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 18 Dec 2014 15:19:35 -0800 Subject: [PATCH 20/30] consul: Test FSM restore of tombstones --- consul/fsm_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/consul/fsm_test.go b/consul/fsm_test.go index 9db2c8c3a..8de6db7b0 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -357,6 +357,12 @@ func TestFSM_SnapshotRestore(t *testing.T) { acl := &structs.ACL{ID: generateUUID(), Name: "User Token"} fsm.state.ACLSet(10, acl) + fsm.state.KVSSet(11, &structs.DirEntry{ + Key: "/remove", + Value: []byte("foo"), + }) + fsm.state.KVSDelete(12, "/remove") + // Snapshot snap, err := fsm.Snapshot() if err != nil { @@ -446,6 +452,15 @@ func TestFSM_SnapshotRestore(t *testing.T) { if idx <= 1 { t.Fatalf("bad index: %d", idx) } + + // Verify tombstones are restored + _, res, err := fsm.state.tombstoneTable.Get("id", "/remove") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) != 1 { + t.Fatalf("bad: %v", res) + } } func TestFSM_KVSSet(t *testing.T) { From b1fefa6d9047a704214dad873912f877ae99e08a Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 18 Dec 2014 15:22:10 -0800 Subject: [PATCH 21/30] consul: Test FSM Reap operations --- consul/fsm_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/consul/fsm_test.go b/consul/fsm_test.go index 8de6db7b0..3a069ff46 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -940,3 +940,46 @@ func TestFSM_ACL_Set_Delete(t *testing.T) { t.Fatalf("should be destroyed") } } + +func TestFSM_TombstoneReap(t *testing.T) { + path, err := ioutil.TempDir("", "fsm") + if err != nil { + t.Fatalf("err: %v", err) + } + fsm, err := NewFSM(nil, path, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + defer fsm.Close() + + // Create some tombstones + fsm.state.KVSSet(11, &structs.DirEntry{ + Key: "/remove", + Value: []byte("foo"), + }) + fsm.state.KVSDelete(12, "/remove") + + // Create a new reap request + req := structs.TombstoneRequest{ + Datacenter: "dc1", + Op: structs.TombstoneReap, + ReapIndex: 12, + } + buf, err := structs.Encode(structs.TombstoneRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if err, ok := resp.(error); ok { + t.Fatalf("resp: %v", err) + } + + // Verify the tombstones are gone + _, res, err := fsm.state.tombstoneTable.Get("id") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) != 0 { + t.Fatalf("bad: %v", res) + } +} From e2bfaa11a20f3e7d64ae67af2f3aaf1c6b3e3f03 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 18 Dec 2014 16:02:08 -0800 Subject: [PATCH 22/30] consul: Testing leader issue of reap command --- consul/leader_test.go | 49 +++++++++++++++++++++++++++++++++++++++++++ consul/state_store.go | 1 + 2 files changed, 50 insertions(+) diff --git a/consul/leader_test.go b/consul/leader_test.go index b41ac9755..7c51dc32a 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -511,3 +511,52 @@ func TestLeader_TombstoneGC_Reset(t *testing.T) { t.Fatalf("should have pending expiration") }) } + +func TestLeader_ReapTombstones(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.TombstoneTTL = 50 * time.Millisecond + c.TombstoneTTLGranularity = 10 * time.Millisecond + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + client := rpcClient(t, s1) + testutil.WaitForLeader(t, client.Call, "dc1") + + // Create a KV entry + arg := structs.KVSRequest{ + Datacenter: "dc1", + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: "test", + Value: []byte("test"), + }, + } + var out bool + if err := client.Call("KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Delete the KV entry (tombstoned) + arg.Op = structs.KVSDelete + if err := client.Call("KVS.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure we have a tombstone + _, res, err := s1.fsm.State().tombstoneTable.Get("id") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(res) == 0 { + t.Fatalf("missing tombstones") + } + + // Check that the new leader has a pending GC expiration + testutil.WaitForResult(func() (bool, error) { + _, res, err := s1.fsm.State().tombstoneTable.Get("id") + return len(res) == 0, err + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} diff --git a/consul/state_store.go b/consul/state_store.go index 529c8dbf8..a82d184b8 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -132,6 +132,7 @@ func NewStateStorePath(gc *TombstoneGC, path string, logOutput io.Writer) (*Stat env: env, watch: make(map[*MDBTable]*NotifyGroup), lockDelay: make(map[string]time.Time), + gc: gc, } // Ensure we can initialize From a350ec93792a2e5b051fbfdc4dd0b7ae25a15e6c Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 18 Dec 2014 16:09:02 -0800 Subject: [PATCH 23/30] consul: Mesure time for reapTombstones --- consul/leader.go | 1 + 1 file changed, 1 insertion(+) diff --git a/consul/leader.go b/consul/leader.go index df0c46c28..deaeccd08 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -536,6 +536,7 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error { // through Raft to ensure consistency. We do this outside the leader loop // to avoid blocking. func (s *Server) reapTombstones(index uint64) { + defer metrics.MeasureSince([]string{"consul", "leader", "reapTombstones"}, time.Now()) req := structs.TombstoneRequest{ Datacenter: s.config.Datacenter, Op: structs.TombstoneReap, From b70dac1a62049906326898809b5164f0adad0446 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 18 Dec 2014 16:27:46 -0800 Subject: [PATCH 24/30] consul: Ensure KVS List handles tombstones --- consul/kvs_endpoint.go | 42 ++++++++++++++------------- consul/state_store.go | 32 +++++++++++++++++++-- consul/state_store_test.go | 58 ++++++++++++++++++++++++++++++++++++-- 3 files changed, 107 insertions(+), 25 deletions(-) diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 53ed238be..622cb110a 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -135,32 +135,36 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e &reply.QueryMeta, state.QueryTables("KVSList"), func() error { - index, ent, err := state.KVSList(args.Key) + tombIndex, index, ent, err := state.KVSList(args.Key) if err != nil { return err } if acl != nil { ent = FilterDirEnt(acl, ent) } - if len(ent) == 0 { - // Must provide non-zero index to prevent blocking - // Index 1 is impossible anyways (due to Raft internals) - if index == 0 { - reply.Index = 1 - } else { - reply.Index = index - } - reply.Entries = nil - } else { - // Determine the maximum affected index - var maxIndex uint64 - for _, e := range ent { - if e.ModifyIndex > maxIndex { - maxIndex = e.ModifyIndex - } - } - reply.Index = maxIndex + // Determine the maximum affected index + var maxIndex uint64 + for _, e := range ent { + if e.ModifyIndex > maxIndex { + maxIndex = e.ModifyIndex + } + } + if tombIndex > maxIndex { + maxIndex = tombIndex + } + // Must provide non-zero index to prevent blocking + // Index 1 is impossible anyways (due to Raft internals) + if maxIndex == 0 { + if index > 0 { + maxIndex = index + } else { + maxIndex = 1 + } + } + reply.Index = maxIndex + + if len(ent) != 0 { reply.Entries = ent } return nil diff --git a/consul/state_store.go b/consul/state_store.go index a82d184b8..32d4b889a 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -1115,13 +1115,39 @@ func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) { } // KVSList is used to list all KV entries with a prefix -func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) { - idx, res, err := s.kvsTable.Get("id_prefix", prefix) +func (s *StateStore) KVSList(prefix string) (uint64, uint64, structs.DirEntries, error) { + tables := MDBTables{s.kvsTable, s.tombstoneTable} + tx, err := tables.StartTxn(true) + if err != nil { + return 0, 0, nil, err + } + defer tx.Abort() + + idx, err := tables.LastIndexTxn(tx) + if err != nil { + return 0, 0, nil, err + } + + res, err := s.kvsTable.GetTxn(tx, "id_prefix", prefix) + if err != nil { + return 0, 0, nil, err + } ents := make(structs.DirEntries, len(res)) for idx, r := range res { ents[idx] = r.(*structs.DirEntry) } - return idx, ents, err + + // Check for the higest index in the tombstone table + var maxIndex uint64 + res, err = s.tombstoneTable.GetTxn(tx, "id_prefix", prefix) + for _, r := range res { + ent := r.(*structs.DirEntry) + if ent.ModifyIndex > maxIndex { + maxIndex = ent.ModifyIndex + } + } + + return maxIndex, idx, ents, err } // KVSListKeys is used to list keys with a prefix, and up to a given seperator diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 9ba723d11..54d9ffa42 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -1588,7 +1588,7 @@ func TestKVS_List(t *testing.T) { defer store.Close() // Should not exist - idx, ents, err := store.KVSList("/web") + _, idx, ents, err := store.KVSList("/web") if err != nil { t.Fatalf("err: %v", err) } @@ -1614,7 +1614,7 @@ func TestKVS_List(t *testing.T) { } // Should list - idx, ents, err = store.KVSList("/web") + _, idx, ents, err = store.KVSList("/web") if err != nil { t.Fatalf("err: %v", err) } @@ -1636,6 +1636,55 @@ func TestKVS_List(t *testing.T) { } } +func TestKVSList_TombstoneIndex(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // Create the entries + d := &structs.DirEntry{Key: "/web/a", Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/web/b", Value: []byte("test")} + if err := store.KVSSet(1001, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/web/c", Value: []byte("test")} + if err := store.KVSSet(1002, d); err != nil { + t.Fatalf("err: %v", err) + } + + // Nuke the last node + err = store.KVSDeleteTree(1003, "/web/c") + if err != nil { + t.Fatalf("err: %v", err) + } + + // Add another node + d = &structs.DirEntry{Key: "/other", Value: []byte("test")} + if err := store.KVSSet(1004, d); err != nil { + t.Fatalf("err: %v", err) + } + + // List should properly reflect tombstoned value + tombIdx, idx, ents, err := store.KVSList("/web") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1004 { + t.Fatalf("bad: %v", idx) + } + if tombIdx != 1003 { + t.Fatalf("bad: %v", idx) + } + if len(ents) != 2 { + t.Fatalf("bad: %v", ents) + } +} + func TestKVS_ListKeys(t *testing.T) { store, err := testStateStore() if err != nil { @@ -1852,13 +1901,16 @@ func TestKVSDeleteTree(t *testing.T) { } // Nothing should list - idx, ents, err := store.KVSList("/web") + tombIdx, idx, ents, err := store.KVSList("/web") if err != nil { t.Fatalf("err: %v", err) } if idx != 1010 { t.Fatalf("bad: %v", idx) } + if tombIdx != 1010 { + t.Fatalf("bad: %v", idx) + } if len(ents) != 0 { t.Fatalf("bad: %v", ents) } From 7a4b53256438a9d187167343fce59d36125d6a77 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 18 Dec 2014 16:37:45 -0800 Subject: [PATCH 25/30] consul: List Keys should handle tombstones --- consul/state_store.go | 21 +++++++++++-- consul/state_store_test.go | 62 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 3 deletions(-) diff --git a/consul/state_store.go b/consul/state_store.go index 32d4b889a..feeec3ca0 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -1152,7 +1152,8 @@ func (s *StateStore) KVSList(prefix string) (uint64, uint64, structs.DirEntries, // KVSListKeys is used to list keys with a prefix, and up to a given seperator func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, error) { - tx, err := s.kvsTable.StartTxn(true, nil) + tables := MDBTables{s.kvsTable, s.tombstoneTable} + tx, err := tables.StartTxn(true) if err != nil { return 0, nil, err } @@ -1172,6 +1173,7 @@ func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, er // Aggregate the stream stream := make(chan interface{}, 128) + streamTomb := make(chan interface{}, 128) done := make(chan struct{}) var keys []string var maxIndex uint64 @@ -1205,18 +1207,31 @@ func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, er keys = append(keys, ent.Key) } } + + // Handle the tombstones for any index updates + for raw := range streamTomb { + ent := raw.(*structs.DirEntry) + if ent.ModifyIndex > maxIndex { + maxIndex = ent.ModifyIndex + } + } close(done) }() // Start the stream, and wait for completion - err = s.kvsTable.StreamTxn(stream, tx, "id_prefix", prefix) + if err = s.kvsTable.StreamTxn(stream, tx, "id_prefix", prefix); err != nil { + return 0, nil, err + } + if err := s.tombstoneTable.StreamTxn(streamTomb, tx, "id_prefix", prefix); err != nil { + return 0, nil, err + } <-done // Use the maxIndex if we have any keys if maxIndex != 0 { idx = maxIndex } - return idx, keys, err + return idx, keys, nil } // KVSDelete is used to delete a KVS entry diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 54d9ffa42..0ad29c370 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -1859,6 +1859,68 @@ func TestKVS_ListKeys_Index(t *testing.T) { } } +func TestKVS_ListKeys_TombstoneIndex(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + // Create the entries + d := &structs.DirEntry{Key: "/foo/a", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1000, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/bar/b", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1001, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/baz/c", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1002, d); err != nil { + t.Fatalf("err: %v", err) + } + d = &structs.DirEntry{Key: "/other/d", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(1003, d); err != nil { + t.Fatalf("err: %v", err) + } + if err := store.KVSDelete(1004, "/baz/c"); err != nil { + t.Fatalf("err: %v", err) + } + + idx, keys, err := store.KVSListKeys("/foo", "") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1000 { + t.Fatalf("bad: %v", idx) + } + if len(keys) != 1 { + t.Fatalf("bad: %v", keys) + } + + idx, keys, err = store.KVSListKeys("/ba", "") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1004 { + t.Fatalf("bad: %v", idx) + } + if len(keys) != 1 { + t.Fatalf("bad: %v", keys) + } + + idx, keys, err = store.KVSListKeys("/nope", "") + if err != nil { + t.Fatalf("err: %v", err) + } + if idx != 1004 { + t.Fatalf("bad: %v", idx) + } + if len(keys) != 0 { + t.Fatalf("bad: %v", keys) + } +} + func TestKVSDeleteTree(t *testing.T) { store, err := testStateStore() if err != nil { From bf40a2ac1f6b265e09cf0a45a82653dc6f201a27 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 18 Dec 2014 16:50:15 -0800 Subject: [PATCH 26/30] consul: Reverting some index compute logic --- consul/kvs_endpoint.go | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 622cb110a..e4b5a36b6 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -143,28 +143,26 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e ent = FilterDirEnt(acl, ent) } - // Determine the maximum affected index - var maxIndex uint64 - for _, e := range ent { - if e.ModifyIndex > maxIndex { - maxIndex = e.ModifyIndex - } - } - if tombIndex > maxIndex { - maxIndex = tombIndex - } - // Must provide non-zero index to prevent blocking - // Index 1 is impossible anyways (due to Raft internals) - if maxIndex == 0 { - if index > 0 { - maxIndex = index + if len(ent) == 0 { + // Must provide non-zero index to prevent blocking + // Index 1 is impossible anyways (due to Raft internals) + if index == 0 { + reply.Index = 1 } else { - maxIndex = 1 + reply.Index = index } - } - reply.Index = maxIndex - - if len(ent) != 0 { + } else { + // Determine the maximum affected index + var maxIndex uint64 + for _, e := range ent { + if e.ModifyIndex > maxIndex { + maxIndex = e.ModifyIndex + } + } + if tombIndex > maxIndex { + maxIndex = tombIndex + } + reply.Index = maxIndex reply.Entries = ent } return nil From 4f6f5ae6f0d6a7b32d962e99d01eba13c4c0d8e9 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 18 Dec 2014 17:02:43 -0800 Subject: [PATCH 27/30] consul: Improve log message --- consul/state_store.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/consul/state_store.go b/consul/state_store.go index feeec3ca0..825c0131a 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -1458,19 +1458,19 @@ func (s *StateStore) ReapTombstones(index uint64) error { } }() if err := s.tombstoneTable.StreamTxn(streamCh, tx, "id"); err != nil { - s.logger.Printf("[ERR] consul.state: Failed to scan tombstones: %v", err) + s.logger.Printf("[ERR] consul.state: failed to scan tombstones: %v", err) return fmt.Errorf("failed to scan tombstones: %v", err) } <-doneCh // Delete each tombstone if len(toDelete) > 0 { - s.logger.Printf("[DEBUG] consul.state: Reaping %d tombstones", len(toDelete)) + s.logger.Printf("[DEBUG] consul.state: reaping %d tombstones up to %d", len(toDelete), index) } for _, key := range toDelete { num, err := s.tombstoneTable.DeleteTxn(tx, "id", key) if err != nil { - s.logger.Printf("[ERR] consul.state: Failed to delete tombstone: %v", err) + s.logger.Printf("[ERR] consul.state: failed to delete tombstone: %v", err) return fmt.Errorf("failed to delete tombstone: %v", err) } if num != 1 { From 200b348f69cf7706f1a2fd8c1111dc1f17921430 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 5 Jan 2015 14:58:59 -0800 Subject: [PATCH 28/30] consul: Disable tombstones as follower --- consul/leader.go | 24 ++++++++++++++---- consul/tombstone_gc.go | 50 ++++++++++++++++++++++++++----------- consul/tombstone_gc_test.go | 4 ++- 3 files changed, 57 insertions(+), 21 deletions(-) diff --git a/consul/leader.go b/consul/leader.go index deaeccd08..c08711204 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -50,16 +50,15 @@ func (s *Server) monitorLeadership() { // leaderLoop runs as long as we are the leader to run various // maintence activities func (s *Server) leaderLoop(stopCh chan struct{}) { + // Ensure we revoke leadership on stepdown + defer s.revokeLeadership() + // Fire a user event indicating a new leader payload := []byte(s.config.NodeName) if err := s.serfLAN.UserEvent(newLeaderEvent, payload, false); err != nil { s.logger.Printf("[WARN] consul: failed to broadcast new leader event: %v", err) } - // Clear the session timers on either shutdown or step down, since we - // are no longer responsible for session expirations. - defer s.clearAllSessionTimers() - // Reconcile channel is only used once initial reconcile // has succeeded var reconcileCh chan serf.Member @@ -126,7 +125,7 @@ func (s *Server) establishLeadership() error { // Hint the tombstone expiration timer. When we freshly establish leadership // we become the authoritative timer, and so we need to start the clock // on any pending GC events. - s.tombstoneGC.Reset() + s.tombstoneGC.SetEnabled(true) lastIndex := s.raft.LastIndex() s.tombstoneGC.Hint(lastIndex) s.logger.Printf("[DEBUG] consul: reset tombstone GC to index %d", lastIndex) @@ -154,6 +153,21 @@ func (s *Server) establishLeadership() error { return nil } +// revokeLeadership is invoked once we step down as leader. +// This is used to cleanup any state that may be specific to a leader. +func (s *Server) revokeLeadership() error { + // Disable the tombstone GC, since it is only useful as a leader + s.tombstoneGC.SetEnabled(false) + + // Clear the session timers on either shutdown or step down, since we + // are no longer responsible for session expirations. + if err := s.clearAllSessionTimers(); err != nil { + s.logger.Printf("[ERR] consul: Clearing session timers failed: %v", err) + return err + } + return nil +} + // initializeACL is used to setup the ACLs if we are the leader // and need to do this. func (s *Server) initializeACL() error { diff --git a/consul/tombstone_gc.go b/consul/tombstone_gc.go index 0d0da74ad..8a238409c 100644 --- a/consul/tombstone_gc.go +++ b/consul/tombstone_gc.go @@ -23,13 +23,18 @@ type TombstoneGC struct { ttl time.Duration granularity time.Duration + // enabled controls if we actually setup any timers. + enabled bool + // expires maps the time of expiration to the highest // tombstone value that should be expired. - expires map[time.Time]*expireInterval - expiresLock sync.Mutex + expires map[time.Time]*expireInterval // expireCh is used to stream expiration expireCh chan uint64 + + // lock is used to ensure safe access to all the fields + lock sync.Mutex } // expireInterval is used to track the maximum index @@ -53,6 +58,7 @@ func NewTombstoneGC(ttl, granularity time.Duration) (*TombstoneGC, error) { t := &TombstoneGC{ ttl: ttl, granularity: granularity, + enabled: false, expires: make(map[time.Time]*expireInterval), expireCh: make(chan uint64, 1), } @@ -65,14 +71,25 @@ func (t *TombstoneGC) ExpireCh() <-chan uint64 { return t.expireCh } -// Reset is used to clear the TTL timers -func (t *TombstoneGC) Reset() { - t.expiresLock.Lock() - defer t.expiresLock.Unlock() - for _, exp := range t.expires { - exp.timer.Stop() +// SetEnabled is used to control if the tombstone GC is +// enabled. Should only be enabled by the leader node. +func (t *TombstoneGC) SetEnabled(enabled bool) { + t.lock.Lock() + defer t.lock.Unlock() + if enabled == t.enabled { + return } - t.expires = make(map[time.Time]*expireInterval) + + // Stop all the timers and clear + if !enabled { + for _, exp := range t.expires { + exp.timer.Stop() + } + t.expires = make(map[time.Time]*expireInterval) + } + + // Update the status + t.enabled = enabled } // Hint is used to indicate that keys at the given index have been @@ -80,8 +97,11 @@ func (t *TombstoneGC) Reset() { func (t *TombstoneGC) Hint(index uint64) { expires := t.nextExpires() - t.expiresLock.Lock() - defer t.expiresLock.Unlock() + t.lock.Lock() + defer t.lock.Unlock() + if !t.enabled { + return + } // Check for an existing expiration timer exp, ok := t.expires[expires] @@ -104,8 +124,8 @@ func (t *TombstoneGC) Hint(index uint64) { // PendingExpiration is used to check if any expirations are pending func (t *TombstoneGC) PendingExpiration() bool { - t.expiresLock.Lock() - defer t.expiresLock.Unlock() + t.lock.Lock() + defer t.lock.Unlock() return len(t.expires) > 0 } @@ -120,10 +140,10 @@ func (t *TombstoneGC) nextExpires() time.Time { // expireTime is used to expire the entries at the given time func (t *TombstoneGC) expireTime(expires time.Time) { // Get the maximum index and clear the entry - t.expiresLock.Lock() + t.lock.Lock() exp := t.expires[expires] delete(t.expires, expires) - t.expiresLock.Unlock() + t.lock.Unlock() // Notify the expires channel t.expireCh <- exp.maxIndex diff --git a/consul/tombstone_gc_test.go b/consul/tombstone_gc_test.go index 6a425a596..a9014fa10 100644 --- a/consul/tombstone_gc_test.go +++ b/consul/tombstone_gc_test.go @@ -29,6 +29,7 @@ func TestTombstoneGC(t *testing.T) { if err != nil { t.Fatalf("should fail") } + gc.SetEnabled(true) if gc.PendingExpiration() { t.Fatalf("should not be pending") @@ -82,13 +83,14 @@ func TestTombstoneGC_Expire(t *testing.T) { if err != nil { t.Fatalf("should fail") } + gc.SetEnabled(true) if gc.PendingExpiration() { t.Fatalf("should not be pending") } gc.Hint(100) - gc.Reset() + gc.SetEnabled(false) if gc.PendingExpiration() { t.Fatalf("should not be pending") From 5ad16ca00f379fb65cca914586d52b45ec000b90 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 5 Jan 2015 15:06:53 -0800 Subject: [PATCH 29/30] consul: Fixing the KVS tests --- consul/state_store_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 0ad29c370..c115939f0 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -1472,6 +1472,7 @@ func TestKVSDelete(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } + gc.SetEnabled(true) store.gc = gc // Create the entry @@ -1934,6 +1935,7 @@ func TestKVSDeleteTree(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } + gc.SetEnabled(true) store.gc = gc // Should not exist @@ -2015,6 +2017,7 @@ func TestReapTombstones(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } + gc.SetEnabled(true) store.gc = gc // Should not exist From 8eaee53661425ac0729a6e4ee9d17072a16dfacc Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 5 Jan 2015 15:13:39 -0800 Subject: [PATCH 30/30] consul: Adding more useful metrics --- consul/fsm.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/consul/fsm.go b/consul/fsm.go index b6bde95e6..7980b9885 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -8,6 +8,7 @@ import ( "log" "time" + "github.com/armon/go-metrics" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/raft" @@ -101,6 +102,7 @@ func (c *consulFSM) decodeRegister(buf []byte, index uint64) interface{} { } func (c *consulFSM) applyRegister(req *structs.RegisterRequest, index uint64) interface{} { + defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now()) // Apply all updates in a single transaction if err := c.state.EnsureRegistration(index, req); err != nil { c.logger.Printf("[INFO] consul.fsm: EnsureRegistration failed: %v", err) @@ -110,6 +112,7 @@ func (c *consulFSM) applyRegister(req *structs.RegisterRequest, index uint64) in } func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"consul", "fsm", "deregister"}, time.Now()) var req structs.DeregisterRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) @@ -140,6 +143,7 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} { if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } + defer metrics.MeasureSince([]string{"consul", "fsm", "kvs", string(req.Op)}, time.Now()) switch req.Op { case structs.KVSSet: return c.state.KVSSet(index, &req.DirEnt) @@ -180,6 +184,7 @@ func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{} if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } + defer metrics.MeasureSince([]string{"consul", "fsm", "session", string(req.Op)}, time.Now()) switch req.Op { case structs.SessionCreate: if err := c.state.SessionCreate(index, &req.Session); err != nil { @@ -200,6 +205,7 @@ func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} { if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } + defer metrics.MeasureSince([]string{"consul", "fsm", "acl", string(req.Op)}, time.Now()) switch req.Op { case structs.ACLForceSet: fallthrough @@ -222,6 +228,7 @@ func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{ if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } + defer metrics.MeasureSince([]string{"consul", "fsm", "tombstone", string(req.Op)}, time.Now()) switch req.Op { case structs.TombstoneReap: return c.state.ReapTombstones(req.ReapIndex) @@ -335,6 +342,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { } func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { + defer metrics.MeasureSince([]string{"consul", "fsm", "persist"}, time.Now()) // Register the nodes encoder := codec.NewEncoder(sink, msgpackHandle)