Keeps the old state store state if a restore fails.
This commit is contained in:
parent
418b627f4e
commit
d780c49bac
|
@ -318,29 +318,19 @@ func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
|
|||
return &consulSnapshot{c.state.Snapshot()}, nil
|
||||
}
|
||||
|
||||
// Restore streams in the snapshot and replaces the current state store with a
|
||||
// new one based on the snapshot if all goes OK during the restore.
|
||||
func (c *consulFSM) Restore(old io.ReadCloser) error {
|
||||
defer old.Close()
|
||||
|
||||
// Create a new state store
|
||||
// Create a new state store.
|
||||
stateNew, err := state.NewStateStore(c.gc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// External code might be calling State(), so we need to synchronize
|
||||
// here to make sure we swap in the new state store atomically.
|
||||
c.stateLock.Lock()
|
||||
stateOld := c.state
|
||||
c.state = stateNew
|
||||
c.stateLock.Unlock()
|
||||
|
||||
// The old state store has been abandoned already since we've replaced
|
||||
// it with an empty one, but we defer telling watchers about it until
|
||||
// the restore is done, so they wake up once we have the latest data.
|
||||
defer stateOld.Abandon()
|
||||
|
||||
// Set up a new restore transaction
|
||||
restore := c.state.Restore()
|
||||
restore := stateNew.Restore()
|
||||
defer restore.Abort()
|
||||
|
||||
// Create a decoder
|
||||
|
@ -443,6 +433,18 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
|
|||
}
|
||||
|
||||
restore.Commit()
|
||||
|
||||
// External code might be calling State(), so we need to synchronize
|
||||
// here to make sure we swap in the new state store atomically.
|
||||
c.stateLock.Lock()
|
||||
stateOld := c.state
|
||||
c.state = stateNew
|
||||
c.stateLock.Unlock()
|
||||
|
||||
// Signal that the old state store has been abandoned. This is required
|
||||
// because we don't operate on it any more, we just throw it away, so
|
||||
// blocking queries won't see any changes and need to be woken up.
|
||||
stateOld.Abandon()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -592,37 +592,41 @@ func TestFSM_SnapshotRestore(t *testing.T) {
|
|||
|
||||
}
|
||||
|
||||
func TestFSM_KVSSet(t *testing.T) {
|
||||
func TestFSM_BadRestore(t *testing.T) {
|
||||
// Create an FSM with some state.
|
||||
fsm, err := NewFSM(nil, os.Stderr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
||||
abandonCh := fsm.state.AbandonCh()
|
||||
|
||||
req := structs.KVSRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: "/test/path",
|
||||
Flags: 0,
|
||||
Value: []byte("test"),
|
||||
},
|
||||
}
|
||||
buf, err := structs.Encode(structs.KVSRequestType, req)
|
||||
if err != nil {
|
||||
// Do a bad restore.
|
||||
buf := bytes.NewBuffer([]byte("bad snapshot"))
|
||||
sink := &MockSink{buf, false}
|
||||
if err := fsm.Restore(sink); err == nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
resp := fsm.Apply(makeLog(buf))
|
||||
if resp != nil {
|
||||
t.Fatalf("resp: %v", resp)
|
||||
|
||||
// Verify the contents didn't get corrupted.
|
||||
_, nodes, err := fsm.state.Nodes(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if len(nodes) != 1 {
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
if nodes[0].Node != "foo" ||
|
||||
nodes[0].Address != "127.0.0.1" ||
|
||||
len(nodes[0].TaggedAddresses) != 0 {
|
||||
t.Fatalf("bad: %v", nodes[0])
|
||||
}
|
||||
|
||||
// Verify key is set
|
||||
_, d, err := fsm.state.KVSGet(nil, "/test/path")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if d == nil {
|
||||
t.Fatalf("missing")
|
||||
// Verify the old state store didn't get abandoned.
|
||||
select {
|
||||
case <-abandonCh:
|
||||
t.Fatalf("bad")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue