Adds a facility to notify when restores occur.
This commit is contained in:
parent
e59f398d80
commit
c3a1014fbf
|
@ -330,9 +330,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
|
|||
// External code might be calling State(), so we need to synchronize
|
||||
// here to make sure we swap in the new state store atomically.
|
||||
c.stateLock.Lock()
|
||||
stateOld := c.state
|
||||
c.state = stateNew
|
||||
c.stateLock.Unlock()
|
||||
|
||||
// The old state store has been abandoned already since we've replaced
|
||||
// it with an empty one, but we defer telling watchers about it until
|
||||
// the restore is done, so they wake up one we have the latest data.
|
||||
defer stateOld.Abandon()
|
||||
|
||||
// Set up a new restore transaction
|
||||
restore := c.state.Restore()
|
||||
defer restore.Abort()
|
||||
|
|
|
@ -563,6 +563,33 @@ func TestFSM_SnapshotRestore(t *testing.T) {
|
|||
if !reflect.DeepEqual(queries[0], &query) {
|
||||
t.Fatalf("bad: %#v", queries[0])
|
||||
}
|
||||
|
||||
// Snapshot
|
||||
snap, err = fsm2.Snapshot()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer snap.Release()
|
||||
|
||||
// Persist
|
||||
buf = bytes.NewBuffer(nil)
|
||||
sink = &MockSink{buf, false}
|
||||
if err := snap.Persist(sink); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Try to restore on the old FSM and make sure it abandons the old state
|
||||
// store.
|
||||
abandonCh := fsm.state.AbandonCh()
|
||||
if err := fsm.Restore(sink); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
select {
|
||||
case <-abandonCh:
|
||||
default:
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestFSM_KVSSet(t *testing.T) {
|
||||
|
|
|
@ -469,6 +469,10 @@ RUN_QUERY:
|
|||
var ws memdb.WatchSet
|
||||
if queryOpts.MinQueryIndex > 0 {
|
||||
ws = memdb.NewWatchSet()
|
||||
|
||||
// This channel will be closed if a snapshot is restored and the
|
||||
// whole state store is abandoned.
|
||||
ws.Add(s.fsm.State().AbandonCh())
|
||||
}
|
||||
|
||||
// Block up to the timeout if we didn't see anything fresh.
|
||||
|
|
|
@ -50,6 +50,10 @@ type StateStore struct {
|
|||
schema *memdb.DBSchema
|
||||
db *memdb.MemDB
|
||||
|
||||
// abandonCh is used to signal watchers that this state store has been
|
||||
// abandoned (usually during a restore). This is only ever closed.
|
||||
abandonCh chan struct{}
|
||||
|
||||
// tableWatches holds all the full table watches, indexed by table name.
|
||||
tableWatches map[string]*FullTableWatch
|
||||
|
||||
|
@ -118,6 +122,7 @@ func NewStateStore(gc *TombstoneGC) (*StateStore, error) {
|
|||
s := &StateStore{
|
||||
schema: schema,
|
||||
db: db,
|
||||
abandonCh: make(chan struct{}),
|
||||
tableWatches: tableWatches,
|
||||
kvsWatch: NewPrefixWatchManager(),
|
||||
kvsGraveyard: NewGraveyard(gc),
|
||||
|
@ -175,6 +180,18 @@ func (s *StateRestore) Commit() {
|
|||
s.tx.Commit()
|
||||
}
|
||||
|
||||
// AbandonCh returns a channel you can wait on to know if the state store was
|
||||
// abandoned.
|
||||
func (s *StateStore) AbandonCh() <-chan struct{} {
|
||||
return s.abandonCh
|
||||
}
|
||||
|
||||
// Abandon is used to signal that the given state store has been abandoned.
|
||||
// Calling this more than one time will panic.
|
||||
func (s *StateStore) Abandon() {
|
||||
close(s.abandonCh)
|
||||
}
|
||||
|
||||
// maxIndex is a helper used to retrieve the highest known index
|
||||
// amongst a set of tables in the db.
|
||||
func (s *StateStore) maxIndex(tables ...string) uint64 {
|
||||
|
|
|
@ -164,6 +164,17 @@ func TestStateStore_Restore_Abort(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStateStore_Abandon(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
abandonCh := s.AbandonCh()
|
||||
s.Abandon()
|
||||
select {
|
||||
case <-abandonCh:
|
||||
default:
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_maxIndex(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
|
|
Loading…
Reference in New Issue