Nukes old state store's connection to FSM and RPC.

This commit is contained in:
James Phillips 2015-10-12 20:56:31 -07:00
parent 450886246d
commit 3f07f0685f
5 changed files with 27 additions and 292 deletions

View file

@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"time"
@ -26,7 +25,6 @@ type consulFSM struct {
logger *log.Logger
path string
stateNew *state.StateStore
state *StateStore
gc *state.TombstoneGC
}
@ -34,7 +32,6 @@ type consulFSM struct {
// state in a way that can be accessed concurrently with operations
// that may modify the live state.
type consulSnapshot struct {
state *StateSnapshot
stateNew *state.StateSnapshot
}
@ -46,51 +43,26 @@ type snapshotHeader struct {
}
// NewFSMPath is used to construct a new FSM with a blank state
func NewFSM(gc *state.TombstoneGC, path string, logOutput io.Writer) (*consulFSM, error) {
// Create the state store.
func NewFSM(gc *state.TombstoneGC, logOutput io.Writer) (*consulFSM, error) {
stateNew, err := state.NewStateStore(gc)
if err != nil {
return nil, err
}
// Create a temporary path for the state store
tmpPath, err := ioutil.TempDir(path, "state")
if err != nil {
return nil, err
}
// Create a state store
state, err := NewStateStorePath(gc, tmpPath, logOutput)
if err != nil {
return nil, err
}
fsm := &consulFSM{
logOutput: logOutput,
logger: log.New(logOutput, "", log.LstdFlags),
path: path,
stateNew: stateNew,
state: state,
gc: gc,
}
return fsm, nil
}
// Close is used to cleanup resources associated with the FSM
func (c *consulFSM) Close() error {
return c.state.Close()
}
// StateNew is used to return a handle to the current state
func (c *consulFSM) StateNew() *state.StateStore {
return c.stateNew
}
// State is used to return a handle to the current state
func (c *consulFSM) State() *StateStore {
return c.state
}
func (c *consulFSM) Apply(log *raft.Log) interface{} {
buf := log.Data
msgType := structs.MessageType(buf[0])
@ -282,30 +254,18 @@ func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
}(time.Now())
// Create a new snapshot
snap, err := c.state.Snapshot()
if err != nil {
return nil, err
}
return &consulSnapshot{snap, c.stateNew.Snapshot()}, nil
return &consulSnapshot{c.stateNew.Snapshot()}, nil
}
func (c *consulFSM) Restore(old io.ReadCloser) error {
defer old.Close()
// Create a temporary path for the state store
tmpPath, err := ioutil.TempDir(c.path, "state")
if err != nil {
return err
}
// Create a new state store
store, err := NewStateStorePath(c.gc, tmpPath, c.logOutput)
stateNew, err := state.NewStateStore(c.gc)
if err != nil {
return err
}
c.state.Close()
c.state = store
c.stateNew = stateNew
// Create a decoder
dec := codec.NewDecoder(old, msgpackHandle)
@ -390,6 +350,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
defer metrics.MeasureSince([]string{"consul", "fsm", "persist"}, time.Now())
// Register the nodes
encoder := codec.NewEncoder(sink, msgpackHandle)
@ -557,6 +518,5 @@ func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink,
}
func (s *consulSnapshot) Release() {
s.state.Close()
s.stateNew.Close()
}

View file

@ -2,7 +2,6 @@ package consul
import (
"bytes"
"io/ioutil"
"os"
"testing"
@ -38,16 +37,10 @@ func makeLog(buf []byte) *raft.Log {
}
func TestFSM_RegisterNode(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.RegisterRequest{
Datacenter: "dc1",
@ -87,16 +80,10 @@ func TestFSM_RegisterNode(t *testing.T) {
}
func TestFSM_RegisterNode_Service(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.RegisterRequest{
Datacenter: "dc1",
@ -155,16 +142,10 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
}
func TestFSM_DeregisterService(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.RegisterRequest{
Datacenter: "dc1",
@ -222,16 +203,10 @@ func TestFSM_DeregisterService(t *testing.T) {
}
func TestFSM_DeregisterCheck(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.RegisterRequest{
Datacenter: "dc1",
@ -289,16 +264,10 @@ func TestFSM_DeregisterCheck(t *testing.T) {
}
func TestFSM_DeregisterNode(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.RegisterRequest{
Datacenter: "dc1",
@ -371,16 +340,10 @@ func TestFSM_DeregisterNode(t *testing.T) {
}
func TestFSM_SnapshotRestore(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
// Add some state
fsm.stateNew.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
@ -433,11 +396,10 @@ func TestFSM_SnapshotRestore(t *testing.T) {
}
// Try to restore on a new FSM
fsm2, err := NewFSM(nil, path, os.Stderr)
fsm2, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm2.Close()
// Do a restore
if err := fsm2.Restore(sink); err != nil {
@ -519,16 +481,10 @@ func TestFSM_SnapshotRestore(t *testing.T) {
}
func TestFSM_KVSSet(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.KVSRequest{
Datacenter: "dc1",
@ -559,16 +515,10 @@ func TestFSM_KVSSet(t *testing.T) {
}
func TestFSM_KVSDelete(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.KVSRequest{
Datacenter: "dc1",
@ -610,16 +560,10 @@ func TestFSM_KVSDelete(t *testing.T) {
}
func TestFSM_KVSDeleteTree(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.KVSRequest{
Datacenter: "dc1",
@ -662,16 +606,10 @@ func TestFSM_KVSDeleteTree(t *testing.T) {
}
func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.KVSRequest{
Datacenter: "dc1",
@ -723,16 +661,10 @@ func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
}
func TestFSM_KVSCheckAndSet(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.KVSRequest{
Datacenter: "dc1",
@ -785,16 +717,10 @@ func TestFSM_KVSCheckAndSet(t *testing.T) {
}
func TestFSM_SessionCreate_Destroy(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
fsm.stateNew.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
fsm.stateNew.EnsureCheck(2, &structs.HealthCheck{
@ -870,16 +796,10 @@ func TestFSM_SessionCreate_Destroy(t *testing.T) {
}
func TestFSM_KVSLock(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
fsm.stateNew.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
session := &structs.Session{ID: generateUUID(), Node: "foo"}
@ -920,16 +840,10 @@ func TestFSM_KVSLock(t *testing.T) {
}
func TestFSM_KVSUnlock(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
fsm.stateNew.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
session := &structs.Session{ID: generateUUID(), Node: "foo"}
@ -988,16 +902,10 @@ func TestFSM_KVSUnlock(t *testing.T) {
}
func TestFSM_ACL_Set_Delete(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
// Create a new ACL
req := structs.ACLRequest{
@ -1066,16 +974,10 @@ func TestFSM_ACL_Set_Delete(t *testing.T) {
}
func TestFSM_TombstoneReap(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
// Create some tombstones
fsm.stateNew.KVSSet(11, &structs.DirEntry{
@ -1117,16 +1019,10 @@ func TestFSM_TombstoneReap(t *testing.T) {
}
func TestFSM_IgnoreUnknown(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
// Create a new reap request
type UnknownRequest struct {

View file

@ -1,7 +1,6 @@
package consul
import (
"io/ioutil"
"os"
"reflect"
"testing"
@ -11,15 +10,10 @@ import (
// Testing for GH-300 and GH-279
func TestHealthCheckRace(t *testing.T) {
path, err := ioutil.TempDir("", "fsm")
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
state := fsm.StateNew()
req := structs.RegisterRequest{

View file

@ -297,108 +297,8 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{},
return future.Response(), nil
}
// blockingRPC is used for queries that need to wait for a
// minimum index. This is used to block and wait for changes.
func (s *Server) blockingRPC(b *structs.QueryOptions, m *structs.QueryMeta,
tables MDBTables, run func() error) error {
opts := blockingRPCOptions{
queryOpts: b,
queryMeta: m,
tables: tables,
run: run,
}
return s.blockingRPCOpt(&opts)
}
// blockingRPCOptions is used to parameterize blockingRPCOpt since
// it takes so many options. It should be preferred over blockingRPC.
type blockingRPCOptions struct {
queryOpts *structs.QueryOptions
queryMeta *structs.QueryMeta
tables MDBTables
kvWatch bool
kvPrefix string
run func() error
}
// blockingRPCOpt is the replacement for blockingRPC as it allows
// for more parameterization easily. It should be preferred over blockingRPC.
func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error {
var timeout *time.Timer
var notifyCh chan struct{}
var state *StateStore
// Fast path non-blocking
if opts.queryOpts.MinQueryIndex == 0 {
goto RUN_QUERY
}
// Sanity check that we have tables to block on
if len(opts.tables) == 0 && !opts.kvWatch {
panic("no tables to block on")
}
// Restrict the max query time, and ensure there is always one
if opts.queryOpts.MaxQueryTime > maxQueryTime {
opts.queryOpts.MaxQueryTime = maxQueryTime
} else if opts.queryOpts.MaxQueryTime <= 0 {
opts.queryOpts.MaxQueryTime = defaultQueryTime
}
// Apply a small amount of jitter to the request
opts.queryOpts.MaxQueryTime += randomStagger(opts.queryOpts.MaxQueryTime / jitterFraction)
// Setup a query timeout
timeout = time.NewTimer(opts.queryOpts.MaxQueryTime)
// Setup the notify channel
notifyCh = make(chan struct{}, 1)
// Ensure we tear down any watchers on return
state = s.fsm.State()
defer func() {
timeout.Stop()
state.StopWatch(opts.tables, notifyCh)
if opts.kvWatch {
state.StopWatchKV(opts.kvPrefix, notifyCh)
}
}()
REGISTER_NOTIFY:
// Register the notification channel. This may be done
// multiple times if we have not reached the target wait index.
state.Watch(opts.tables, notifyCh)
if opts.kvWatch {
state.WatchKV(opts.kvPrefix, notifyCh)
}
RUN_QUERY:
// Update the query meta data
s.setQueryMeta(opts.queryMeta)
// Check if query must be consistent
if opts.queryOpts.RequireConsistent {
if err := s.consistentRead(); err != nil {
return err
}
}
// Run the query function
metrics.IncrCounter([]string{"consul", "rpc", "query"}, 1)
err := opts.run()
// Check for minimum query time
if err == nil && opts.queryMeta.Index > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex {
select {
case <-notifyCh:
goto REGISTER_NOTIFY
case <-timeout.C:
}
}
return err
}
// TODO(slackpad)
// blockingRPCNew is used for queries that need to wait for a minimum index. This
// is used to block and wait for changes.
func (s *Server) blockingRPCNew(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta,
watch state.Watch, run func() error) error {
var timeout *time.Timer

View file

@ -34,7 +34,6 @@ const (
serfLANSnapshot = "serf/local.snapshot"
serfWANSnapshot = "serf/remote.snapshot"
raftState = "raft/"
tmpStatePath = "tmp/"
snapshotsRetained = 2
// serverRPCCache controls how long we keep an idle connection
@ -317,18 +316,9 @@ func (s *Server) setupRaft() error {
s.config.RaftConfig.EnableSingleNode = true
}
// Create the base state path
statePath := filepath.Join(s.config.DataDir, tmpStatePath)
if err := os.RemoveAll(statePath); err != nil {
return err
}
if err := ensurePath(statePath, true); err != nil {
return err
}
// Create the FSM
var err error
s.fsm, err = NewFSM(s.tombstoneGC, statePath, s.config.LogOutput)
s.fsm, err = NewFSM(s.tombstoneGC, s.config.LogOutput)
if err != nil {
return err
}
@ -491,11 +481,6 @@ func (s *Server) Shutdown() error {
// Close the connection pool
s.connPool.Shutdown()
// Close the fsm
if s.fsm != nil {
s.fsm.Close()
}
return nil
}