diff --git a/Makefile b/Makefile new file mode 100644 index 000000000..22a4369a6 --- /dev/null +++ b/Makefile @@ -0,0 +1,27 @@ +DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...) + +all: deps + @mkdir -p bin/ + @bash --norc -i ./scripts/build.sh + +cov: + gocov test ./... | gocov-html > /tmp/coverage.html + open /tmp/coverage.html + +deps: + go get -d -v ./... + echo $(DEPS) | xargs -n1 go get -d + +test: deps + go list ./... | xargs -n1 go test + +integ: + go list ./... | INTEG_TESTS=yes xargs -n1 go test + +web: + ./scripts/website_run.sh + +web-push: + ./scripts/website_push.sh + +.PNONY: all cov deps integ test web web-push diff --git a/consul/config.go b/consul/config.go new file mode 100644 index 000000000..902c5b171 --- /dev/null +++ b/consul/config.go @@ -0,0 +1,70 @@ +package consul + +import ( + "github.com/hashicorp/memberlist" + "github.com/hashicorp/raft" + "github.com/hashicorp/serf/serf" + "io" + "os" +) + +const ( + DefaultRaftAddr = "0.0.0.0:8300" + DefaultLANSerfPort = 8301 + DefaultWANSerfPort = 8302 +) + +// Config is used to configure the server +type Config struct { + // Datacenter is the datacenter this Consul server represents + Datacenter string + + // DataDir is the directory to store our state in + DataDir string + + // Node name is the name we use to advertise. Defaults to hostname. + NodeName string + + // Bind address for Raft (TCP) + RaftBindAddr string + + // RaftConfig is the configuration used for Raft in the local DC + RaftConfig *raft.Config + + // SerfLocalConfig is the configuration for the local serf + SerfLocalConfig *serf.Config + + // SerfRemoteConfig is the configuration for the remtoe serf + SerfRemoteConfig *serf.Config + + // LogOutput is the location to write logs to. If this is not set, + // logs will go to stderr. + LogOutput io.Writer +} + +// DefaultConfig is used to return a sane default configuration +func DefaultConfig() *Config { + hostname, err := os.Hostname() + if err != nil { + panic(err) + } + + conf := &Config{ + Datacenter: "dc1", + NodeName: hostname, + RaftBindAddr: DefaultRaftAddr, + RaftConfig: raft.DefaultConfig(), + SerfLocalConfig: serf.DefaultConfig(), + SerfRemoteConfig: serf.DefaultConfig(), + } + + // Remote Serf should use the WAN timing, since we are using it + // to communicate between DC's + conf.SerfRemoteConfig.MemberlistConfig = memberlist.DefaultWANConfig() + + // Ensure we don't have port conflicts + conf.SerfLocalConfig.MemberlistConfig.Port = DefaultLANSerfPort + conf.SerfRemoteConfig.MemberlistConfig.Port = DefaultWANSerfPort + + return conf +} diff --git a/consul/fsm.go b/consul/fsm.go new file mode 100644 index 000000000..d0e84a85f --- /dev/null +++ b/consul/fsm.go @@ -0,0 +1,41 @@ +package consul + +import ( + "github.com/hashicorp/raft" + "io" +) + +// consulFSM implements a finite state machine that is used +// along with Raft to provide strong consistency for various +// data that requires it. We implement this outside the Server +// to avoid exposing this outside the package. +type consulFSM struct { + server *Server +} + +// consulSnapshot is used to provide a snapshot of the current +// state in a way that can be accessed concurrently with operations +// that may modify the live state. +type consulSnapshot struct { + fsm *consulFSM +} + +func (c *consulFSM) Apply([]byte) interface{} { + return nil +} + +func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) { + snap := &consulSnapshot{fsm: c} + return snap, nil +} + +func (c *consulFSM) Restore(io.ReadCloser) error { + return nil +} + +func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { + return nil +} + +func (s *consulSnapshot) Release() { +} diff --git a/consul/server.go b/consul/server.go new file mode 100644 index 000000000..59c3023a9 --- /dev/null +++ b/consul/server.go @@ -0,0 +1,223 @@ +package consul + +import ( + "fmt" + "github.com/hashicorp/raft" + "github.com/hashicorp/serf/serf" + "log" + "os" + "path/filepath" + "sync" + "time" +) + +const ( + serfLocalSnapshot = "serf/local.snapshot" + serfRemoteSnapshot = "serf/remote.snapshot" + raftState = "raft/" +) + +// Server is Consul server which manages the service discovery, +// health checking, DC forwarding, Raft, and multiple Serf pools. +type Server struct { + config *Config + + // eventChLocal is used to receive events from the + // serfLocal cluster + eventChLocal chan serf.Event + + // eventChRemote is used to receive events from the + // serfRemote cluster + eventChRemote chan serf.Event + + // fsm is the state machine used with Raft to provide + // strong consistency. + fsm *consulFSM + + // Logger uses the provided LogOutput + logger *log.Logger + + // The raft instance is used among Consul nodes within the + // DC to protect operations that require strong consistency + raft *raft.Raft + raftStore *raft.SQLiteStore + raftTransport *raft.NetworkTransport + + // serfLocal is the Serf cluster maintained inside the DC + // which contains all the DC nodes + serfLocal *serf.Serf + + // serfRemote is the Serf cluster maintained between DC's + // which SHOULD only consist of Consul servers + serfRemote *serf.Serf + + shutdown bool + shutdownCh chan struct{} + shutdownLock sync.Mutex +} + +// NewServer is used to construct a new Consul server from the +// configuration, potentially returning an error +func NewServer(config *Config) (*Server, error) { + // Check for a data directory! + if config.DataDir == "" { + return nil, fmt.Errorf("Config must provide a DataDir") + } + + // Ensure we have a log output + if config.LogOutput == nil { + config.LogOutput = os.Stderr + } + + // Create a logger + logger := log.New(config.LogOutput, "", log.LstdFlags) + + // Create server + s := &Server{ + config: config, + eventChLocal: make(chan serf.Event, 256), + eventChRemote: make(chan serf.Event, 256), + logger: logger, + shutdownCh: make(chan struct{}), + } + + // Start the Serf listeners to prevent a deadlock + go s.localEventHandler() + go s.remoteEventHandler() + + // Initialize the local Serf + var err error + s.serfLocal, err = s.setupSerf(config.SerfLocalConfig, s.eventChLocal, serfLocalSnapshot) + if err != nil { + s.Shutdown() + return nil, fmt.Errorf("Failed to start local serf: %v", err) + } + + // Initialize the remote Serf + s.serfRemote, err = s.setupSerf(config.SerfRemoteConfig, s.eventChRemote, serfRemoteSnapshot) + if err != nil { + s.Shutdown() + return nil, fmt.Errorf("Failed to start remote serf: %v", err) + } + + // Initialize the Raft server + if err := s.setupRaft(); err != nil { + s.Shutdown() + return nil, fmt.Errorf("Failed to start Raft: %v", err) + } + + return s, nil +} + +// ensurePath is used to make sure a path exists +func (s *Server) ensurePath(path string, dir bool) error { + if !dir { + path = filepath.Dir(path) + } + return os.MkdirAll(path, 0755) +} + +// setupSerf is used to setup and initialize a Serf +func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) { + conf.NodeName = s.config.NodeName + conf.Role = fmt.Sprintf("consul:%s", s.config.Datacenter) + conf.MemberlistConfig.LogOutput = s.config.LogOutput + conf.LogOutput = s.config.LogOutput + conf.EventCh = ch + conf.SnapshotPath = filepath.Join(s.config.DataDir, path) + if err := s.ensurePath(conf.SnapshotPath, false); err != nil { + return nil, err + } + return serf.Create(conf) +} + +// setupRaft is used to setup and initialize Raft +func (s *Server) setupRaft() error { + // Create the base path + path := filepath.Join(s.config.DataDir, raftState) + if err := s.ensurePath(path, true); err != nil { + return err + } + + // Create the SQLite store for logs and stable storage + store, err := raft.NewSQLiteStore(path) + if err != nil { + return err + } + + // Create the snapshot store + snapshots, err := raft.NewFileSnapshotStore(path, 3) + if err != nil { + store.Close() + return err + } + + // Create a transport layer + trans, err := raft.NewTCPTransport(s.config.RaftBindAddr, 3, 10*time.Second) + if err != nil { + store.Close() + return err + } + + // Setup the peer store + peers := raft.NewJSONPeers(path, trans) + + // Create the FSM + s.fsm = &consulFSM{server: s} + + // Setup the Raft store + raft, err := raft.NewRaft(s.config.RaftConfig, s.fsm, store, store, snapshots, + peers, trans) + if err != nil { + store.Close() + trans.Close() + return err + } + + s.raft = raft + s.raftStore = store + s.raftTransport = trans + return nil +} + +// Shutdown is used to shutdown the server +func (s *Server) Shutdown() error { + s.logger.Printf("[INFO] Shutting down Consul server") + s.shutdownLock.Lock() + defer s.shutdownLock.Unlock() + + if s.shutdown { + return nil + } + + s.shutdown = true + close(s.shutdownCh) + + if s.serfLocal != nil { + s.serfLocal.Shutdown() + s.serfLocal = nil + } + + if s.serfRemote != nil { + s.serfRemote.Shutdown() + s.serfRemote = nil + } + + if s.raft != nil { + s.raft.Shutdown() + s.raftStore.Close() + s.raftTransport.Close() + s.raft = nil + s.raftStore = nil + s.raftTransport = nil + } + return nil +} + +// localEventHandler is used to handle events from the local Serf cluster +func (s *Server) localEventHandler() { +} + +// remoteEventHandler is used to handle events from the remote Serf cluster +func (s *Server) remoteEventHandler() { +} diff --git a/consul/server_test.go b/consul/server_test.go new file mode 100644 index 000000000..040ccbf0e --- /dev/null +++ b/consul/server_test.go @@ -0,0 +1,37 @@ +package consul + +import ( + "io/ioutil" + "os" + "testing" +) + +func tmpDir(t *testing.T) string { + dir, err := ioutil.TempDir("", "consul") + if err != nil { + t.Fatalf("err: %v", err) + } + return dir +} + +func TestServer_StartStop(t *testing.T) { + dir := tmpDir(t) + defer os.RemoveAll(dir) + + config := DefaultConfig() + config.DataDir = dir + + server, err := NewServer(config) + if err != nil { + t.Fatalf("err: %v", err) + } + + if err := server.Shutdown(); err != nil { + t.Fatalf("err: %v", err) + } + + // Idempotent + if err := server.Shutdown(); err != nil { + t.Fatalf("err: %v", err) + } +}