diff --git a/consul/fsm.go b/consul/fsm.go index bdd2385c1..93f28ea78 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "log" "time" @@ -26,7 +25,6 @@ type consulFSM struct { logger *log.Logger path string stateNew *state.StateStore - state *StateStore gc *state.TombstoneGC } @@ -34,7 +32,6 @@ type consulFSM struct { // state in a way that can be accessed concurrently with operations // that may modify the live state. type consulSnapshot struct { - state *StateSnapshot stateNew *state.StateSnapshot } @@ -46,51 +43,26 @@ type snapshotHeader struct { } // NewFSMPath is used to construct a new FSM with a blank state -func NewFSM(gc *state.TombstoneGC, path string, logOutput io.Writer) (*consulFSM, error) { - // Create the state store. +func NewFSM(gc *state.TombstoneGC, logOutput io.Writer) (*consulFSM, error) { stateNew, err := state.NewStateStore(gc) if err != nil { return nil, err } - // Create a temporary path for the state store - tmpPath, err := ioutil.TempDir(path, "state") - if err != nil { - return nil, err - } - - // Create a state store - state, err := NewStateStorePath(gc, tmpPath, logOutput) - if err != nil { - return nil, err - } - fsm := &consulFSM{ logOutput: logOutput, logger: log.New(logOutput, "", log.LstdFlags), - path: path, stateNew: stateNew, - state: state, gc: gc, } return fsm, nil } -// Close is used to cleanup resources associated with the FSM -func (c *consulFSM) Close() error { - return c.state.Close() -} - // StateNew is used to return a handle to the current state func (c *consulFSM) StateNew() *state.StateStore { return c.stateNew } -// State is used to return a handle to the current state -func (c *consulFSM) State() *StateStore { - return c.state -} - func (c *consulFSM) Apply(log *raft.Log) interface{} { buf := log.Data msgType := structs.MessageType(buf[0]) @@ -282,30 +254,18 @@ func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) { c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start)) }(time.Now()) - // Create a new snapshot - snap, err := c.state.Snapshot() - if err != nil { - return nil, err - } - return &consulSnapshot{snap, c.stateNew.Snapshot()}, nil + return &consulSnapshot{c.stateNew.Snapshot()}, nil } func (c *consulFSM) Restore(old io.ReadCloser) error { defer old.Close() - // Create a temporary path for the state store - tmpPath, err := ioutil.TempDir(c.path, "state") - if err != nil { - return err - } - // Create a new state store - store, err := NewStateStorePath(c.gc, tmpPath, c.logOutput) + stateNew, err := state.NewStateStore(c.gc) if err != nil { return err } - c.state.Close() - c.state = store + c.stateNew = stateNew // Create a decoder dec := codec.NewDecoder(old, msgpackHandle) @@ -390,6 +350,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { defer metrics.MeasureSince([]string{"consul", "fsm", "persist"}, time.Now()) + // Register the nodes encoder := codec.NewEncoder(sink, msgpackHandle) @@ -557,6 +518,5 @@ func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink, } func (s *consulSnapshot) Release() { - s.state.Close() s.stateNew.Close() } diff --git a/consul/fsm_test.go b/consul/fsm_test.go index 05ffb5e2a..1b4b5c952 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -2,7 +2,6 @@ package consul import ( "bytes" - "io/ioutil" "os" "testing" @@ -38,16 +37,10 @@ func makeLog(buf []byte) *raft.Log { } func TestFSM_RegisterNode(t *testing.T) { - path, err := ioutil.TempDir("", "fsm") + fsm, err := NewFSM(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } - defer os.RemoveAll(path) - fsm, err := NewFSM(nil, path, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - defer fsm.Close() req := structs.RegisterRequest{ Datacenter: "dc1", @@ -87,16 +80,10 @@ func TestFSM_RegisterNode(t *testing.T) { } func TestFSM_RegisterNode_Service(t *testing.T) { - path, err := ioutil.TempDir("", "fsm") + fsm, err := NewFSM(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } - defer os.RemoveAll(path) - fsm, err := NewFSM(nil, path, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - defer fsm.Close() req := structs.RegisterRequest{ Datacenter: "dc1", @@ -155,16 +142,10 @@ func TestFSM_RegisterNode_Service(t *testing.T) { } func TestFSM_DeregisterService(t *testing.T) { - path, err := ioutil.TempDir("", "fsm") + fsm, err := NewFSM(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } - defer os.RemoveAll(path) - fsm, err := NewFSM(nil, path, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - defer fsm.Close() req := structs.RegisterRequest{ Datacenter: "dc1", @@ -222,16 +203,10 @@ func TestFSM_DeregisterService(t *testing.T) { } func TestFSM_DeregisterCheck(t *testing.T) { - path, err := ioutil.TempDir("", "fsm") + fsm, err := NewFSM(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } - defer os.RemoveAll(path) - fsm, err := NewFSM(nil, path, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - defer fsm.Close() req := structs.RegisterRequest{ Datacenter: "dc1", @@ -289,16 +264,10 @@ func TestFSM_DeregisterCheck(t *testing.T) { } func TestFSM_DeregisterNode(t *testing.T) { - path, err := ioutil.TempDir("", "fsm") + fsm, err := NewFSM(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } - defer os.RemoveAll(path) - fsm, err := NewFSM(nil, path, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - defer fsm.Close() req := structs.RegisterRequest{ Datacenter: "dc1", @@ -371,16 +340,10 @@ func TestFSM_DeregisterNode(t *testing.T) { } func TestFSM_SnapshotRestore(t *testing.T) { - path, err := ioutil.TempDir("", "fsm") + fsm, err := NewFSM(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } - defer os.RemoveAll(path) - fsm, err := NewFSM(nil, path, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - defer fsm.Close() // Add some state fsm.stateNew.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) @@ -433,11 +396,10 @@ func TestFSM_SnapshotRestore(t *testing.T) { } // Try to restore on a new FSM - fsm2, err := NewFSM(nil, path, os.Stderr) + fsm2, err := NewFSM(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } - defer fsm2.Close() // Do a restore if err := fsm2.Restore(sink); err != nil { @@ -519,16 +481,10 @@ func TestFSM_SnapshotRestore(t *testing.T) { } func TestFSM_KVSSet(t *testing.T) { - path, err := ioutil.TempDir("", "fsm") + fsm, err := NewFSM(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } - defer os.RemoveAll(path) - fsm, err := NewFSM(nil, path, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - defer fsm.Close() req := structs.KVSRequest{ Datacenter: "dc1", @@ -559,16 +515,10 @@ func TestFSM_KVSSet(t *testing.T) { } func TestFSM_KVSDelete(t *testing.T) { - path, err := ioutil.TempDir("", "fsm") + fsm, err := NewFSM(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } - defer os.RemoveAll(path) - fsm, err := NewFSM(nil, path, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - defer fsm.Close() req := structs.KVSRequest{ Datacenter: "dc1", @@ -610,16 +560,10 @@ func TestFSM_KVSDelete(t *testing.T) { } func TestFSM_KVSDeleteTree(t *testing.T) { - path, err := ioutil.TempDir("", "fsm") + fsm, err := NewFSM(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } - defer os.RemoveAll(path) - fsm, err := NewFSM(nil, path, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - defer fsm.Close() req := structs.KVSRequest{ Datacenter: "dc1", @@ -662,16 +606,10 @@ func TestFSM_KVSDeleteTree(t *testing.T) { } func TestFSM_KVSDeleteCheckAndSet(t *testing.T) { - path, err := ioutil.TempDir("", "fsm") + fsm, err := NewFSM(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } - defer os.RemoveAll(path) - fsm, err := NewFSM(nil, path, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - defer fsm.Close() req := structs.KVSRequest{ Datacenter: "dc1", @@ -723,16 +661,10 @@ func TestFSM_KVSDeleteCheckAndSet(t *testing.T) { } func TestFSM_KVSCheckAndSet(t *testing.T) { - path, err := ioutil.TempDir("", "fsm") + fsm, err := NewFSM(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } - defer os.RemoveAll(path) - fsm, err := NewFSM(nil, path, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - defer fsm.Close() req := structs.KVSRequest{ Datacenter: "dc1", @@ -785,16 +717,10 @@ func TestFSM_KVSCheckAndSet(t *testing.T) { } func TestFSM_SessionCreate_Destroy(t *testing.T) { - path, err := ioutil.TempDir("", "fsm") + fsm, err := NewFSM(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } - defer os.RemoveAll(path) - fsm, err := NewFSM(nil, path, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - defer fsm.Close() fsm.stateNew.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) fsm.stateNew.EnsureCheck(2, &structs.HealthCheck{ @@ -870,16 +796,10 @@ func TestFSM_SessionCreate_Destroy(t *testing.T) { } func TestFSM_KVSLock(t *testing.T) { - path, err := ioutil.TempDir("", "fsm") + fsm, err := NewFSM(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } - defer os.RemoveAll(path) - fsm, err := NewFSM(nil, path, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - defer fsm.Close() fsm.stateNew.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) session := &structs.Session{ID: generateUUID(), Node: "foo"} @@ -920,16 +840,10 @@ func TestFSM_KVSLock(t *testing.T) { } func TestFSM_KVSUnlock(t *testing.T) { - path, err := ioutil.TempDir("", "fsm") + fsm, err := NewFSM(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } - defer os.RemoveAll(path) - fsm, err := NewFSM(nil, path, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - defer fsm.Close() fsm.stateNew.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) session := &structs.Session{ID: generateUUID(), Node: "foo"} @@ -988,16 +902,10 @@ func TestFSM_KVSUnlock(t *testing.T) { } func TestFSM_ACL_Set_Delete(t *testing.T) { - path, err := ioutil.TempDir("", "fsm") + fsm, err := NewFSM(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } - defer os.RemoveAll(path) - fsm, err := NewFSM(nil, path, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - defer fsm.Close() // Create a new ACL req := structs.ACLRequest{ @@ -1066,16 +974,10 @@ func TestFSM_ACL_Set_Delete(t *testing.T) { } func TestFSM_TombstoneReap(t *testing.T) { - path, err := ioutil.TempDir("", "fsm") + fsm, err := NewFSM(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } - defer os.RemoveAll(path) - fsm, err := NewFSM(nil, path, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - defer fsm.Close() // Create some tombstones fsm.stateNew.KVSSet(11, &structs.DirEntry{ @@ -1117,16 +1019,10 @@ func TestFSM_TombstoneReap(t *testing.T) { } func TestFSM_IgnoreUnknown(t *testing.T) { - path, err := ioutil.TempDir("", "fsm") + fsm, err := NewFSM(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } - defer os.RemoveAll(path) - fsm, err := NewFSM(nil, path, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - defer fsm.Close() // Create a new reap request type UnknownRequest struct { diff --git a/consul/issue_test.go b/consul/issue_test.go index 452ff7c78..dd0fa86a8 100644 --- a/consul/issue_test.go +++ b/consul/issue_test.go @@ -1,7 +1,6 @@ package consul import ( - "io/ioutil" "os" "reflect" "testing" @@ -11,15 +10,10 @@ import ( // Testing for GH-300 and GH-279 func TestHealthCheckRace(t *testing.T) { - path, err := ioutil.TempDir("", "fsm") + fsm, err := NewFSM(nil, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } - fsm, err := NewFSM(nil, path, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - defer fsm.Close() state := fsm.StateNew() req := structs.RegisterRequest{ diff --git a/consul/rpc.go b/consul/rpc.go index c4c912ee4..ecf9ed865 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -297,108 +297,8 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, return future.Response(), nil } -// blockingRPC is used for queries that need to wait for a -// minimum index. This is used to block and wait for changes. -func (s *Server) blockingRPC(b *structs.QueryOptions, m *structs.QueryMeta, - tables MDBTables, run func() error) error { - opts := blockingRPCOptions{ - queryOpts: b, - queryMeta: m, - tables: tables, - run: run, - } - return s.blockingRPCOpt(&opts) -} - -// blockingRPCOptions is used to parameterize blockingRPCOpt since -// it takes so many options. It should be preferred over blockingRPC. -type blockingRPCOptions struct { - queryOpts *structs.QueryOptions - queryMeta *structs.QueryMeta - tables MDBTables - kvWatch bool - kvPrefix string - run func() error -} - -// blockingRPCOpt is the replacement for blockingRPC as it allows -// for more parameterization easily. It should be preferred over blockingRPC. -func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error { - var timeout *time.Timer - var notifyCh chan struct{} - var state *StateStore - - // Fast path non-blocking - if opts.queryOpts.MinQueryIndex == 0 { - goto RUN_QUERY - } - - // Sanity check that we have tables to block on - if len(opts.tables) == 0 && !opts.kvWatch { - panic("no tables to block on") - } - - // Restrict the max query time, and ensure there is always one - if opts.queryOpts.MaxQueryTime > maxQueryTime { - opts.queryOpts.MaxQueryTime = maxQueryTime - } else if opts.queryOpts.MaxQueryTime <= 0 { - opts.queryOpts.MaxQueryTime = defaultQueryTime - } - - // Apply a small amount of jitter to the request - opts.queryOpts.MaxQueryTime += randomStagger(opts.queryOpts.MaxQueryTime / jitterFraction) - - // Setup a query timeout - timeout = time.NewTimer(opts.queryOpts.MaxQueryTime) - - // Setup the notify channel - notifyCh = make(chan struct{}, 1) - - // Ensure we tear down any watchers on return - state = s.fsm.State() - defer func() { - timeout.Stop() - state.StopWatch(opts.tables, notifyCh) - if opts.kvWatch { - state.StopWatchKV(opts.kvPrefix, notifyCh) - } - }() - -REGISTER_NOTIFY: - // Register the notification channel. This may be done - // multiple times if we have not reached the target wait index. - state.Watch(opts.tables, notifyCh) - if opts.kvWatch { - state.WatchKV(opts.kvPrefix, notifyCh) - } - -RUN_QUERY: - // Update the query meta data - s.setQueryMeta(opts.queryMeta) - - // Check if query must be consistent - if opts.queryOpts.RequireConsistent { - if err := s.consistentRead(); err != nil { - return err - } - } - - // Run the query function - metrics.IncrCounter([]string{"consul", "rpc", "query"}, 1) - err := opts.run() - - // Check for minimum query time - if err == nil && opts.queryMeta.Index > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex { - select { - case <-notifyCh: - goto REGISTER_NOTIFY - case <-timeout.C: - } - } - return err -} - -// TODO(slackpad) +// blockingRPCNew is used for queries that need to wait for a minimum index. This +// is used to block and wait for changes. func (s *Server) blockingRPCNew(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta, watch state.Watch, run func() error) error { var timeout *time.Timer diff --git a/consul/server.go b/consul/server.go index 3ff10e31a..528b6f16a 100644 --- a/consul/server.go +++ b/consul/server.go @@ -34,7 +34,6 @@ const ( serfLANSnapshot = "serf/local.snapshot" serfWANSnapshot = "serf/remote.snapshot" raftState = "raft/" - tmpStatePath = "tmp/" snapshotsRetained = 2 // serverRPCCache controls how long we keep an idle connection @@ -317,18 +316,9 @@ func (s *Server) setupRaft() error { s.config.RaftConfig.EnableSingleNode = true } - // Create the base state path - statePath := filepath.Join(s.config.DataDir, tmpStatePath) - if err := os.RemoveAll(statePath); err != nil { - return err - } - if err := ensurePath(statePath, true); err != nil { - return err - } - // Create the FSM var err error - s.fsm, err = NewFSM(s.tombstoneGC, statePath, s.config.LogOutput) + s.fsm, err = NewFSM(s.tombstoneGC, s.config.LogOutput) if err != nil { return err } @@ -491,11 +481,6 @@ func (s *Server) Shutdown() error { // Close the connection pool s.connPool.Shutdown() - // Close the fsm - if s.fsm != nil { - s.fsm.Close() - } - return nil }