consul: adding basic skeleton
This commit is contained in:
parent
d39b1765c6
commit
94ff23d2a4
|
@ -0,0 +1,27 @@
|
|||
DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...)
|
||||
|
||||
all: deps
|
||||
@mkdir -p bin/
|
||||
@bash --norc -i ./scripts/build.sh
|
||||
|
||||
cov:
|
||||
gocov test ./... | gocov-html > /tmp/coverage.html
|
||||
open /tmp/coverage.html
|
||||
|
||||
deps:
|
||||
go get -d -v ./...
|
||||
echo $(DEPS) | xargs -n1 go get -d
|
||||
|
||||
test: deps
|
||||
go list ./... | xargs -n1 go test
|
||||
|
||||
integ:
|
||||
go list ./... | INTEG_TESTS=yes xargs -n1 go test
|
||||
|
||||
web:
|
||||
./scripts/website_run.sh
|
||||
|
||||
web-push:
|
||||
./scripts/website_push.sh
|
||||
|
||||
.PNONY: all cov deps integ test web web-push
|
|
@ -0,0 +1,70 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/memberlist"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultRaftAddr = "0.0.0.0:8300"
|
||||
DefaultLANSerfPort = 8301
|
||||
DefaultWANSerfPort = 8302
|
||||
)
|
||||
|
||||
// Config is used to configure the server
|
||||
type Config struct {
|
||||
// Datacenter is the datacenter this Consul server represents
|
||||
Datacenter string
|
||||
|
||||
// DataDir is the directory to store our state in
|
||||
DataDir string
|
||||
|
||||
// Node name is the name we use to advertise. Defaults to hostname.
|
||||
NodeName string
|
||||
|
||||
// Bind address for Raft (TCP)
|
||||
RaftBindAddr string
|
||||
|
||||
// RaftConfig is the configuration used for Raft in the local DC
|
||||
RaftConfig *raft.Config
|
||||
|
||||
// SerfLocalConfig is the configuration for the local serf
|
||||
SerfLocalConfig *serf.Config
|
||||
|
||||
// SerfRemoteConfig is the configuration for the remtoe serf
|
||||
SerfRemoteConfig *serf.Config
|
||||
|
||||
// LogOutput is the location to write logs to. If this is not set,
|
||||
// logs will go to stderr.
|
||||
LogOutput io.Writer
|
||||
}
|
||||
|
||||
// DefaultConfig is used to return a sane default configuration
|
||||
func DefaultConfig() *Config {
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
conf := &Config{
|
||||
Datacenter: "dc1",
|
||||
NodeName: hostname,
|
||||
RaftBindAddr: DefaultRaftAddr,
|
||||
RaftConfig: raft.DefaultConfig(),
|
||||
SerfLocalConfig: serf.DefaultConfig(),
|
||||
SerfRemoteConfig: serf.DefaultConfig(),
|
||||
}
|
||||
|
||||
// Remote Serf should use the WAN timing, since we are using it
|
||||
// to communicate between DC's
|
||||
conf.SerfRemoteConfig.MemberlistConfig = memberlist.DefaultWANConfig()
|
||||
|
||||
// Ensure we don't have port conflicts
|
||||
conf.SerfLocalConfig.MemberlistConfig.Port = DefaultLANSerfPort
|
||||
conf.SerfRemoteConfig.MemberlistConfig.Port = DefaultWANSerfPort
|
||||
|
||||
return conf
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/raft"
|
||||
"io"
|
||||
)
|
||||
|
||||
// consulFSM implements a finite state machine that is used
|
||||
// along with Raft to provide strong consistency for various
|
||||
// data that requires it. We implement this outside the Server
|
||||
// to avoid exposing this outside the package.
|
||||
type consulFSM struct {
|
||||
server *Server
|
||||
}
|
||||
|
||||
// consulSnapshot is used to provide a snapshot of the current
|
||||
// state in a way that can be accessed concurrently with operations
|
||||
// that may modify the live state.
|
||||
type consulSnapshot struct {
|
||||
fsm *consulFSM
|
||||
}
|
||||
|
||||
func (c *consulFSM) Apply([]byte) interface{} {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
|
||||
snap := &consulSnapshot{fsm: c}
|
||||
return snap, nil
|
||||
}
|
||||
|
||||
func (c *consulFSM) Restore(io.ReadCloser) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *consulSnapshot) Release() {
|
||||
}
|
|
@ -0,0 +1,223 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
serfLocalSnapshot = "serf/local.snapshot"
|
||||
serfRemoteSnapshot = "serf/remote.snapshot"
|
||||
raftState = "raft/"
|
||||
)
|
||||
|
||||
// Server is Consul server which manages the service discovery,
|
||||
// health checking, DC forwarding, Raft, and multiple Serf pools.
|
||||
type Server struct {
|
||||
config *Config
|
||||
|
||||
// eventChLocal is used to receive events from the
|
||||
// serfLocal cluster
|
||||
eventChLocal chan serf.Event
|
||||
|
||||
// eventChRemote is used to receive events from the
|
||||
// serfRemote cluster
|
||||
eventChRemote chan serf.Event
|
||||
|
||||
// fsm is the state machine used with Raft to provide
|
||||
// strong consistency.
|
||||
fsm *consulFSM
|
||||
|
||||
// Logger uses the provided LogOutput
|
||||
logger *log.Logger
|
||||
|
||||
// The raft instance is used among Consul nodes within the
|
||||
// DC to protect operations that require strong consistency
|
||||
raft *raft.Raft
|
||||
raftStore *raft.SQLiteStore
|
||||
raftTransport *raft.NetworkTransport
|
||||
|
||||
// serfLocal is the Serf cluster maintained inside the DC
|
||||
// which contains all the DC nodes
|
||||
serfLocal *serf.Serf
|
||||
|
||||
// serfRemote is the Serf cluster maintained between DC's
|
||||
// which SHOULD only consist of Consul servers
|
||||
serfRemote *serf.Serf
|
||||
|
||||
shutdown bool
|
||||
shutdownCh chan struct{}
|
||||
shutdownLock sync.Mutex
|
||||
}
|
||||
|
||||
// NewServer is used to construct a new Consul server from the
|
||||
// configuration, potentially returning an error
|
||||
func NewServer(config *Config) (*Server, error) {
|
||||
// Check for a data directory!
|
||||
if config.DataDir == "" {
|
||||
return nil, fmt.Errorf("Config must provide a DataDir")
|
||||
}
|
||||
|
||||
// Ensure we have a log output
|
||||
if config.LogOutput == nil {
|
||||
config.LogOutput = os.Stderr
|
||||
}
|
||||
|
||||
// Create a logger
|
||||
logger := log.New(config.LogOutput, "", log.LstdFlags)
|
||||
|
||||
// Create server
|
||||
s := &Server{
|
||||
config: config,
|
||||
eventChLocal: make(chan serf.Event, 256),
|
||||
eventChRemote: make(chan serf.Event, 256),
|
||||
logger: logger,
|
||||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Start the Serf listeners to prevent a deadlock
|
||||
go s.localEventHandler()
|
||||
go s.remoteEventHandler()
|
||||
|
||||
// Initialize the local Serf
|
||||
var err error
|
||||
s.serfLocal, err = s.setupSerf(config.SerfLocalConfig, s.eventChLocal, serfLocalSnapshot)
|
||||
if err != nil {
|
||||
s.Shutdown()
|
||||
return nil, fmt.Errorf("Failed to start local serf: %v", err)
|
||||
}
|
||||
|
||||
// Initialize the remote Serf
|
||||
s.serfRemote, err = s.setupSerf(config.SerfRemoteConfig, s.eventChRemote, serfRemoteSnapshot)
|
||||
if err != nil {
|
||||
s.Shutdown()
|
||||
return nil, fmt.Errorf("Failed to start remote serf: %v", err)
|
||||
}
|
||||
|
||||
// Initialize the Raft server
|
||||
if err := s.setupRaft(); err != nil {
|
||||
s.Shutdown()
|
||||
return nil, fmt.Errorf("Failed to start Raft: %v", err)
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// ensurePath is used to make sure a path exists
|
||||
func (s *Server) ensurePath(path string, dir bool) error {
|
||||
if !dir {
|
||||
path = filepath.Dir(path)
|
||||
}
|
||||
return os.MkdirAll(path, 0755)
|
||||
}
|
||||
|
||||
// setupSerf is used to setup and initialize a Serf
|
||||
func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) {
|
||||
conf.NodeName = s.config.NodeName
|
||||
conf.Role = fmt.Sprintf("consul:%s", s.config.Datacenter)
|
||||
conf.MemberlistConfig.LogOutput = s.config.LogOutput
|
||||
conf.LogOutput = s.config.LogOutput
|
||||
conf.EventCh = ch
|
||||
conf.SnapshotPath = filepath.Join(s.config.DataDir, path)
|
||||
if err := s.ensurePath(conf.SnapshotPath, false); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return serf.Create(conf)
|
||||
}
|
||||
|
||||
// setupRaft is used to setup and initialize Raft
|
||||
func (s *Server) setupRaft() error {
|
||||
// Create the base path
|
||||
path := filepath.Join(s.config.DataDir, raftState)
|
||||
if err := s.ensurePath(path, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the SQLite store for logs and stable storage
|
||||
store, err := raft.NewSQLiteStore(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the snapshot store
|
||||
snapshots, err := raft.NewFileSnapshotStore(path, 3)
|
||||
if err != nil {
|
||||
store.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
// Create a transport layer
|
||||
trans, err := raft.NewTCPTransport(s.config.RaftBindAddr, 3, 10*time.Second)
|
||||
if err != nil {
|
||||
store.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
// Setup the peer store
|
||||
peers := raft.NewJSONPeers(path, trans)
|
||||
|
||||
// Create the FSM
|
||||
s.fsm = &consulFSM{server: s}
|
||||
|
||||
// Setup the Raft store
|
||||
raft, err := raft.NewRaft(s.config.RaftConfig, s.fsm, store, store, snapshots,
|
||||
peers, trans)
|
||||
if err != nil {
|
||||
store.Close()
|
||||
trans.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
s.raft = raft
|
||||
s.raftStore = store
|
||||
s.raftTransport = trans
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown is used to shutdown the server
|
||||
func (s *Server) Shutdown() error {
|
||||
s.logger.Printf("[INFO] Shutting down Consul server")
|
||||
s.shutdownLock.Lock()
|
||||
defer s.shutdownLock.Unlock()
|
||||
|
||||
if s.shutdown {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.shutdown = true
|
||||
close(s.shutdownCh)
|
||||
|
||||
if s.serfLocal != nil {
|
||||
s.serfLocal.Shutdown()
|
||||
s.serfLocal = nil
|
||||
}
|
||||
|
||||
if s.serfRemote != nil {
|
||||
s.serfRemote.Shutdown()
|
||||
s.serfRemote = nil
|
||||
}
|
||||
|
||||
if s.raft != nil {
|
||||
s.raft.Shutdown()
|
||||
s.raftStore.Close()
|
||||
s.raftTransport.Close()
|
||||
s.raft = nil
|
||||
s.raftStore = nil
|
||||
s.raftTransport = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// localEventHandler is used to handle events from the local Serf cluster
|
||||
func (s *Server) localEventHandler() {
|
||||
}
|
||||
|
||||
// remoteEventHandler is used to handle events from the remote Serf cluster
|
||||
func (s *Server) remoteEventHandler() {
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func tmpDir(t *testing.T) string {
|
||||
dir, err := ioutil.TempDir("", "consul")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
return dir
|
||||
}
|
||||
|
||||
func TestServer_StartStop(t *testing.T) {
|
||||
dir := tmpDir(t)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
config := DefaultConfig()
|
||||
config.DataDir = dir
|
||||
|
||||
server, err := NewServer(config)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if err := server.Shutdown(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Idempotent
|
||||
if err := server.Shutdown(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue