Merge pull request #711 from hashicorp/f-scada

Add support for SCADA for Atlas Integration
This commit is contained in:
Armon Dadgar 2015-02-18 16:54:50 -08:00
commit 7316b5be32
17 changed files with 729 additions and 152 deletions

View File

@ -289,6 +289,49 @@ func (c *Client) doRequest(r *request) (time.Duration, *http.Response, error) {
return diff, resp, err
}
// Query is used to do a GET request against an endpoint
// and deserialize the response into an interface using
// standard Consul conventions.
func (c *Client) query(endpoint string, out interface{}, q *QueryOptions) (*QueryMeta, error) {
r := c.newRequest("GET", endpoint)
r.setQueryOptions(q)
rtt, resp, err := requireOK(c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
if err := decodeBody(resp, out); err != nil {
return nil, err
}
return qm, nil
}
// write is used to do a PUT request against an endpoint
// and serialize/deserialized using the standard Consul conventions.
func (c *Client) write(endpoint string, in, out interface{}, q *WriteOptions) (*WriteMeta, error) {
r := c.newRequest("PUT", endpoint)
r.setWriteOptions(q)
r.obj = in
rtt, resp, err := requireOK(c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{RequestTime: rtt}
if out != nil {
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
}
return wm, nil
}
// parseQueryMeta is used to help parse query meta-data
func parseQueryMeta(resp *http.Response, q *QueryMeta) error {
header := resp.Header

24
api/raw.go Normal file
View File

@ -0,0 +1,24 @@
package api
// Raw can be used to do raw queries against custom endpoints
type Raw struct {
c *Client
}
// Raw returns a handle to query endpoints
func (c *Client) Raw() *Raw {
return &Raw{c}
}
// Query is used to do a GET request against an endpoint
// and deserialize the response into an interface using
// standard Consul conventions.
func (raw *Raw) Query(endpoint string, out interface{}, q *QueryOptions) (*QueryMeta, error) {
return raw.c.query(endpoint, out, q)
}
// Write is used to do a PUT request against an endpoint
// and serialize/deserialized using the standard Consul conventions.
func (raw *Raw) Write(endpoint string, in, out interface{}, q *WriteOptions) (*WriteMeta, error) {
return raw.c.write(endpoint, in, out, q)
}

View File

@ -93,18 +93,9 @@ func (s *Session) Create(se *SessionEntry, q *WriteOptions) (string, *WriteMeta,
}
func (s *Session) create(obj interface{}, q *WriteOptions) (string, *WriteMeta, error) {
r := s.c.newRequest("PUT", "/v1/session/create")
r.setWriteOptions(q)
r.obj = obj
rtt, resp, err := requireOK(s.c.doRequest(r))
if err != nil {
return "", nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{RequestTime: rtt}
var out struct{ ID string }
if err := decodeBody(resp, &out); err != nil {
wm, err := s.c.write("/v1/session/create", obj, &out, q)
if err != nil {
return "", nil, err
}
return out.ID, wm, nil
@ -112,35 +103,20 @@ func (s *Session) create(obj interface{}, q *WriteOptions) (string, *WriteMeta,
// Destroy invalides a given session
func (s *Session) Destroy(id string, q *WriteOptions) (*WriteMeta, error) {
r := s.c.newRequest("PUT", "/v1/session/destroy/"+id)
r.setWriteOptions(q)
rtt, resp, err := requireOK(s.c.doRequest(r))
wm, err := s.c.write("/v1/session/destroy/"+id, nil, nil, q)
if err != nil {
return nil, err
}
resp.Body.Close()
wm := &WriteMeta{RequestTime: rtt}
return wm, nil
}
// Renew renews the TTL on a given session
func (s *Session) Renew(id string, q *WriteOptions) (*SessionEntry, *WriteMeta, error) {
r := s.c.newRequest("PUT", "/v1/session/renew/"+id)
r.setWriteOptions(q)
rtt, resp, err := requireOK(s.c.doRequest(r))
var entries []*SessionEntry
wm, err := s.c.write("/v1/session/renew/"+id, nil, &entries, q)
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{RequestTime: rtt}
var entries []*SessionEntry
if err := decodeBody(resp, &entries); err != nil {
return nil, wm, err
}
if len(entries) > 0 {
return entries[0], wm, nil
}
@ -179,23 +155,11 @@ func (s *Session) RenewPeriodic(initialTTL string, id string, q *WriteOptions, d
// Info looks up a single session
func (s *Session) Info(id string, q *QueryOptions) (*SessionEntry, *QueryMeta, error) {
r := s.c.newRequest("GET", "/v1/session/info/"+id)
r.setQueryOptions(q)
rtt, resp, err := requireOK(s.c.doRequest(r))
var entries []*SessionEntry
qm, err := s.c.query("/v1/session/info/"+id, &entries, q)
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var entries []*SessionEntry
if err := decodeBody(resp, &entries); err != nil {
return nil, nil, err
}
if len(entries) > 0 {
return entries[0], qm, nil
}
@ -204,20 +168,9 @@ func (s *Session) Info(id string, q *QueryOptions) (*SessionEntry, *QueryMeta, e
// List gets sessions for a node
func (s *Session) Node(node string, q *QueryOptions) ([]*SessionEntry, *QueryMeta, error) {
r := s.c.newRequest("GET", "/v1/session/node/"+node)
r.setQueryOptions(q)
rtt, resp, err := requireOK(s.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var entries []*SessionEntry
if err := decodeBody(resp, &entries); err != nil {
qm, err := s.c.query("/v1/session/node/"+node, &entries, q)
if err != nil {
return nil, nil, err
}
return entries, qm, nil
@ -225,20 +178,9 @@ func (s *Session) Node(node string, q *QueryOptions) ([]*SessionEntry, *QueryMet
// List gets all active sessions
func (s *Session) List(q *QueryOptions) ([]*SessionEntry, *QueryMeta, error) {
r := s.c.newRequest("GET", "/v1/session/list")
r.setQueryOptions(q)
rtt, resp, err := requireOK(s.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var entries []*SessionEntry
if err := decodeBody(resp, &entries); err != nil {
qm, err := s.c.query("/v1/session/list", &entries, q)
if err != nil {
return nil, nil, err
}
return entries, qm, nil

View File

@ -19,6 +19,7 @@ import (
"github.com/hashicorp/go-checkpoint"
"github.com/hashicorp/go-syslog"
"github.com/hashicorp/logutils"
scada "github.com/hashicorp/scada-client"
"github.com/mitchellh/cli"
)
@ -45,6 +46,7 @@ type Command struct {
rpcServer *AgentRPC
httpServers []*HTTPServer
dnsServer *DNSServer
scadaProvider *scada.Provider
}
// readConfig is responsible for setup of our configuration using
@ -76,6 +78,10 @@ func (c *Command) readConfig() *Config {
cmdFlags.StringVar(&cmdConfig.BindAddr, "bind", "", "address to bind server listeners to")
cmdFlags.StringVar(&cmdConfig.AdvertiseAddr, "advertise", "", "address to advertise instead of bind addr")
cmdFlags.StringVar(&cmdConfig.AtlasInfrastructure, "atlas", "", "infrastructure name in Atlas")
cmdFlags.StringVar(&cmdConfig.AtlasToken, "atlas-token", "", "authentication token for Atlas")
cmdFlags.BoolVar(&cmdConfig.AtlasJoin, "atlas-join", false, "auto-join with Atlas")
cmdFlags.IntVar(&cmdConfig.Protocol, "protocol", -1, "protocol version")
cmdFlags.BoolVar(&cmdConfig.EnableSyslog, "syslog", false,
@ -327,8 +333,21 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log
c.Ui.Output("Starting Consul agent RPC...")
c.rpcServer = NewAgentRPC(agent, rpcListener, logOutput, logWriter)
if config.Ports.HTTP > 0 || config.Ports.HTTPS > 0 {
servers, err := NewHTTPServers(agent, config, logOutput)
// Enable the SCADA integration
var scadaList net.Listener
if config.AtlasInfrastructure != "" {
provider, list, err := NewProvider(config, logOutput)
if err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error starting SCADA connection: %s", err))
return err
}
c.scadaProvider = provider
scadaList = list
}
if config.Ports.HTTP > 0 || config.Ports.HTTPS > 0 || scadaList != nil {
servers, err := NewHTTPServers(agent, config, scadaList, logOutput)
if err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error starting http servers: %s", err))
@ -378,7 +397,6 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log
c.checkpointResults(checkpoint.Check(updateParams))
}()
}
return nil
}
@ -586,10 +604,12 @@ func (c *Command) Run(args []string) int {
if c.dnsServer != nil {
defer c.dnsServer.Shutdown()
}
for _, server := range c.httpServers {
defer server.Shutdown()
}
if c.scadaProvider != nil {
defer c.scadaProvider.Shutdown()
}
// Join startup nodes if specified
if err := c.startupJoin(config); err != nil {
@ -628,6 +648,12 @@ func (c *Command) Run(args []string) int {
gossipEncrypted = c.agent.client.Encrypted()
}
// Determine the Atlas cluster
atlas := "<disabled>"
if config.AtlasInfrastructure != "" {
atlas = fmt.Sprintf("(Infrastructure: '%s' Join: %v)", config.AtlasInfrastructure, config.AtlasJoin)
}
// Let the agent know we've finished registration
c.agent.StartSync()
@ -641,6 +667,7 @@ func (c *Command) Run(args []string) int {
config.Ports.SerfLan, config.Ports.SerfWan))
c.Ui.Info(fmt.Sprintf("Gossip encrypt: %v, RPC-TLS: %v, TLS-Incoming: %v",
gossipEncrypted, config.VerifyOutgoing, config.VerifyIncoming))
c.Ui.Info(fmt.Sprintf(" Atlas: %s", atlas))
// Enable log streaming
c.Ui.Info("")
@ -815,6 +842,9 @@ Usage: consul agent [options]
Options:
-advertise=addr Sets the advertise address to use
-atlas=org/name Sets the Atlas infrastructure name, enables SCADA.
-atlas-join Enables auto-joining the Atlas cluster
-atlas-token=token Provides the Atlas API token
-bootstrap Sets server to bootstrap mode
-bind=0.0.0.0 Sets the bind address for cluster communication
-bootstrap-expect=0 Sets server to expect bootstrap mode.

View File

@ -318,6 +318,23 @@ type Config struct {
// HTTPAPIResponseHeaders are used to add HTTP header response fields to the HTTP API responses.
HTTPAPIResponseHeaders map[string]string `mapstructure:"http_api_response_headers"`
// AtlasInfrastructure is the name of the infrastructure we belong to. e.g. hashicorp/stage
AtlasInfrastructure string `mapstructure:"atlas_infrastructure"`
// AtlasToken is our authentication token from Atlas
AtlasToken string `mapstructure:"atlas_token" json:"-"`
// AtlasACLToken is applied to inbound requests if no other token
// is provided. This takes higher precedence than the ACLToken.
// Without this, the ACLToken is used. If that is not specified either,
// then the 'anonymous' token is used. This can be set to 'anonymous'
// to reduce the Atlas privileges to below that of the ACLToken.
AtlasACLToken string `mapstructure:"atlas_acl_token" json:"-"`
// AtlasJoin controls if Atlas will attempt to auto-join the node
// to it's cluster. Requires Atlas integration.
AtlasJoin bool `mapstructure:"atlas_join"`
// AEInterval controls the anti-entropy interval. This is how often
// the agent attempts to reconcile it's local state with the server'
// representation of our state. Defaults to every 60s.
@ -941,6 +958,18 @@ func MergeConfig(a, b *Config) *Config {
if b.UnixSockets.Perms != "" {
result.UnixSockets.Perms = b.UnixSockets.Perms
}
if b.AtlasInfrastructure != "" {
result.AtlasInfrastructure = b.AtlasInfrastructure
}
if b.AtlasToken != "" {
result.AtlasToken = b.AtlasToken
}
if b.AtlasACLToken != "" {
result.AtlasACLToken = b.AtlasACLToken
}
if b.AtlasJoin {
result.AtlasJoin = true
}
if len(b.HTTPAPIResponseHeaders) != 0 {
if result.HTTPAPIResponseHeaders == nil {

View File

@ -633,6 +633,26 @@ func TestDecodeConfig(t *testing.T) {
if config.HTTPAPIResponseHeaders["X-XSS-Protection"] != "1; mode=block" {
t.Fatalf("bad: %#v", config)
}
// Atlas configs
input = `{"atlas_infrastructure": "hashicorp/prod", "atlas_token": "abcdefg", "atlas_acl_token": "123456789", "atlas_join": true}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if config.AtlasInfrastructure != "hashicorp/prod" {
t.Fatalf("bad: %#v", config)
}
if config.AtlasToken != "abcdefg" {
t.Fatalf("bad: %#v", config)
}
if config.AtlasACLToken != "123456789" {
t.Fatalf("bad: %#v", config)
}
if !config.AtlasJoin {
t.Fatalf("bad: %#v", config)
}
}
func TestDecodeConfig_invalidKeys(t *testing.T) {
@ -1096,6 +1116,10 @@ func TestMergeConfig(t *testing.T) {
Perms: "0700",
},
},
AtlasInfrastructure: "hashicorp/prod",
AtlasToken: "123456789",
AtlasACLToken: "abcdefgh",
AtlasJoin: true,
}
c := MergeConfig(a, b)

View File

@ -19,6 +19,14 @@ import (
"github.com/mitchellh/mapstructure"
)
var (
// scadaHTTPAddr is the address associated with the
// HTTPServer. When populating an ACL token for a request,
// this is checked to switch between the ACLToken and
// AtlasACLToken
scadaHTTPAddr = "SCADA"
)
// HTTPServer is used to wrap an Agent and expose various API's
// in a RESTful manner
type HTTPServer struct {
@ -32,15 +40,11 @@ type HTTPServer struct {
// NewHTTPServers starts new HTTP servers to provide an interface to
// the agent.
func NewHTTPServers(agent *Agent, config *Config, logOutput io.Writer) ([]*HTTPServer, error) {
var tlsConfig *tls.Config
var list net.Listener
var httpAddr net.Addr
var err error
func NewHTTPServers(agent *Agent, config *Config, scada net.Listener, logOutput io.Writer) ([]*HTTPServer, error) {
var servers []*HTTPServer
if config.Ports.HTTPS > 0 {
httpAddr, err = config.ClientListener(config.Addresses.HTTPS, config.Ports.HTTPS)
httpAddr, err := config.ClientListener(config.Addresses.HTTPS, config.Ports.HTTPS)
if err != nil {
return nil, err
}
@ -54,7 +58,7 @@ func NewHTTPServers(agent *Agent, config *Config, logOutput io.Writer) ([]*HTTPS
NodeName: config.NodeName,
ServerName: config.ServerName}
tlsConfig, err = tlsConf.IncomingTLSConfig()
tlsConfig, err := tlsConf.IncomingTLSConfig()
if err != nil {
return nil, err
}
@ -64,7 +68,7 @@ func NewHTTPServers(agent *Agent, config *Config, logOutput io.Writer) ([]*HTTPS
return nil, fmt.Errorf("Failed to get Listen on %s: %v", httpAddr.String(), err)
}
list = tls.NewListener(tcpKeepAliveListener{ln.(*net.TCPListener)}, tlsConfig)
list := tls.NewListener(tcpKeepAliveListener{ln.(*net.TCPListener)}, tlsConfig)
// Create the mux
mux := http.NewServeMux()
@ -86,7 +90,7 @@ func NewHTTPServers(agent *Agent, config *Config, logOutput io.Writer) ([]*HTTPS
}
if config.Ports.HTTP > 0 {
httpAddr, err = config.ClientListener(config.Addresses.HTTP, config.Ports.HTTP)
httpAddr, err := config.ClientListener(config.Addresses.HTTP, config.Ports.HTTP)
if err != nil {
return nil, fmt.Errorf("Failed to get ClientListener address:port: %v", err)
}
@ -107,6 +111,7 @@ func NewHTTPServers(agent *Agent, config *Config, logOutput io.Writer) ([]*HTTPS
return nil, fmt.Errorf("Failed to get Listen on %s: %v", httpAddr.String(), err)
}
var list net.Listener
if isSocket {
// Set up ownership/permission bits on the socket file
if err := setFilePermissions(socketPath, config.UnixSockets); err != nil {
@ -136,6 +141,26 @@ func NewHTTPServers(agent *Agent, config *Config, logOutput io.Writer) ([]*HTTPS
servers = append(servers, srv)
}
if scada != nil {
// Create the mux
mux := http.NewServeMux()
// Create the server
srv := &HTTPServer{
agent: agent,
mux: mux,
listener: scada,
logger: log.New(logOutput, "", log.LstdFlags),
uiDir: config.UiDir,
addr: scadaHTTPAddr,
}
srv.registerHandlers(false) // Never allow debug for SCADA
// Start the server
go http.Serve(scada, mux)
servers = append(servers, srv)
}
return servers, nil
}
@ -159,7 +184,7 @@ func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
// Shutdown is used to shutdown the HTTP server
func (s *HTTPServer) Shutdown() {
if s != nil {
s.logger.Printf("[DEBUG] http: Shutting down http server(%v)", s.addr)
s.logger.Printf("[DEBUG] http: Shutting down http server (%v)", s.addr)
s.listener.Close()
}
}
@ -241,7 +266,10 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
if s.uiDir != "" {
// Static file serving done from /ui/
s.mux.Handle("/ui/", http.StripPrefix("/ui/", http.FileServer(http.Dir(s.uiDir))))
}
// Enable the special endpoints for UI or SCADA
if s.uiDir != "" || s.agent.config.AtlasInfrastructure != "" {
// API's are under /internal/ui/ to avoid conflict
s.mux.HandleFunc("/v1/internal/ui/nodes", s.wrap(s.UINodes))
s.mux.HandleFunc("/v1/internal/ui/node/", s.wrap(s.UINodeInfo))
@ -422,9 +450,17 @@ func (s *HTTPServer) parseDC(req *http.Request, dc *string) {
func (s *HTTPServer) parseToken(req *http.Request, token *string) {
if other := req.URL.Query().Get("token"); other != "" {
*token = other
} else if *token == "" {
*token = s.agent.config.ACLToken
return
}
// Set the AtlasACLToken if SCADA
if s.addr == scadaHTTPAddr && s.agent.config.AtlasACLToken != "" {
*token = s.agent.config.AtlasACLToken
return
}
// Set the default ACLToken
*token = s.agent.config.ACLToken
}
// parse is a convenience method for endpoints that need

View File

@ -36,7 +36,7 @@ func makeHTTPServerWithConfig(t *testing.T, cb func(c *Config)) (string, *HTTPSe
t.Fatalf("err: %v", err)
}
conf.UiDir = uiDir
servers, err := NewHTTPServers(agent, conf, agent.logOutput)
servers, err := NewHTTPServers(agent, conf, nil, agent.logOutput)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -146,7 +146,7 @@ func TestHTTPServer_UnixSocket_FileExists(t *testing.T) {
defer os.RemoveAll(dir)
// Try to start the server with the same path anyways.
if _, err := NewHTTPServers(agent, conf, agent.logOutput); err != nil {
if _, err := NewHTTPServers(agent, conf, nil, agent.logOutput); err != nil {
t.Fatalf("err: %s", err)
}
@ -429,6 +429,67 @@ func TestParseConsistency_Invalid(t *testing.T) {
}
}
// Test ACL token is resolved in correct order
func TestACLResolution(t *testing.T) {
var token string
// Request without token
req, err := http.NewRequest("GET",
"/v1/catalog/nodes", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
// Request with explicit token
reqToken, err := http.NewRequest("GET",
"/v1/catalog/nodes?token=foo", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
httpTest(t, func(srv *HTTPServer) {
// Check when no token is set
srv.agent.config.ACLToken = ""
srv.parseToken(req, &token)
if token != "" {
t.Fatalf("bad: %s", token)
}
// Check when ACLToken set
srv.agent.config.ACLToken = "agent"
srv.parseToken(req, &token)
if token != "agent" {
t.Fatalf("bad: %s", token)
}
// Check when AtlasACLToken set, wrong server
srv.agent.config.AtlasACLToken = "atlas"
srv.parseToken(req, &token)
if token != "agent" {
t.Fatalf("bad: %s", token)
}
// Check when AtlasACLToken set, correct server
srv.addr = scadaHTTPAddr
srv.parseToken(req, &token)
if token != "atlas" {
t.Fatalf("bad: %s", token)
}
// Check when AtlasACLToken not, correct server
srv.agent.config.AtlasACLToken = ""
srv.parseToken(req, &token)
if token != "agent" {
t.Fatalf("bad: %s", token)
}
// Explicit token has highest precedence
srv.parseToken(reqToken, &token)
if token != "foo" {
t.Fatalf("bad: %s", token)
}
})
}
// assertIndex tests that X-Consul-Index is set and non-zero
func assertIndex(t *testing.T, resp *httptest.ResponseRecorder) {
header := resp.Header().Get("X-Consul-Index")

192
command/agent/scada.go Normal file
View File

@ -0,0 +1,192 @@
package agent
import (
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"os"
"strconv"
"sync"
"time"
"github.com/hashicorp/scada-client"
)
const (
// providerService is the service name we use
providerService = "consul"
// resourceType is the type of resource we represent
// when connecting to SCADA
resourceType = "infrastructures"
)
// ProviderService returns the service information for the provider
func ProviderService(c *Config) *client.ProviderService {
return &client.ProviderService{
Service: providerService,
ServiceVersion: fmt.Sprintf("%s%s", c.Version, c.VersionPrerelease),
Capabilities: map[string]int{
"http": 1,
},
Meta: map[string]string{
"auto-join": strconv.FormatBool(c.AtlasJoin),
"datacenter": c.Datacenter,
"server": strconv.FormatBool(c.Server),
},
ResourceType: resourceType,
}
}
// ProviderConfig returns the configuration for the SCADA provider
func ProviderConfig(c *Config) *client.ProviderConfig {
return &client.ProviderConfig{
Service: ProviderService(c),
Handlers: map[string]client.CapabilityProvider{
"http": nil,
},
ResourceGroup: c.AtlasInfrastructure,
Token: c.AtlasToken,
}
}
// NewProvider creates a new SCADA provider using the
// given configuration. Requests for the HTTP capability
// are passed off to the listener that is returned.
func NewProvider(c *Config, logOutput io.Writer) (*client.Provider, net.Listener, error) {
// Get the configuration of the provider
config := ProviderConfig(c)
config.LogOutput = logOutput
// SCADA_INSECURE env variable is used for testing to disable
// TLS certificate verification.
if os.Getenv("SCADA_INSECURE") != "" {
config.TLSConfig = &tls.Config{
InsecureSkipVerify: true,
}
}
// Create an HTTP listener and handler
list := newScadaListener(c.AtlasInfrastructure)
config.Handlers["http"] = func(capability string, meta map[string]string,
conn io.ReadWriteCloser) error {
return list.PushRWC(conn)
}
// Create the provider
provider, err := client.NewProvider(config)
if err != nil {
list.Close()
return nil, nil, err
}
return provider, list, nil
}
// scadaListener is used to return a net.Listener for
// incoming SCADA connections
type scadaListener struct {
addr *scadaAddr
pending chan net.Conn
closed bool
closedCh chan struct{}
l sync.Mutex
}
// newScadaListener returns a new listener
func newScadaListener(infra string) *scadaListener {
l := &scadaListener{
addr: &scadaAddr{infra},
pending: make(chan net.Conn),
closedCh: make(chan struct{}),
}
return l
}
// PushRWC is used to push a io.ReadWriteCloser as a net.Conn
func (s *scadaListener) PushRWC(conn io.ReadWriteCloser) error {
// Check if this already implements net.Conn
if nc, ok := conn.(net.Conn); ok {
return s.Push(nc)
}
// Wrap to implement the interface
wrapped := &scadaRWC{conn, s.addr}
return s.Push(wrapped)
}
// Push is used to add a connection to the queu
func (s *scadaListener) Push(conn net.Conn) error {
select {
case s.pending <- conn:
return nil
case <-time.After(time.Second):
return fmt.Errorf("accept timed out")
case <-s.closedCh:
return fmt.Errorf("scada listener closed")
}
}
func (s *scadaListener) Accept() (net.Conn, error) {
select {
case conn := <-s.pending:
return conn, nil
case <-s.closedCh:
return nil, fmt.Errorf("scada listener closed")
}
}
func (s *scadaListener) Close() error {
s.l.Lock()
defer s.l.Unlock()
if s.closed {
return nil
}
s.closed = true
close(s.closedCh)
return nil
}
func (s *scadaListener) Addr() net.Addr {
return s.addr
}
// scadaAddr is used to return a net.Addr for SCADA
type scadaAddr struct {
infra string
}
func (s *scadaAddr) Network() string {
return "SCADA"
}
func (s *scadaAddr) String() string {
return fmt.Sprintf("SCADA::Atlas::%s", s.infra)
}
type scadaRWC struct {
io.ReadWriteCloser
addr *scadaAddr
}
func (s *scadaRWC) LocalAddr() net.Addr {
return s.addr
}
func (s *scadaRWC) RemoteAddr() net.Addr {
return s.addr
}
func (s *scadaRWC) SetDeadline(t time.Time) error {
return errors.New("SCADA.Conn does not support deadlines")
}
func (s *scadaRWC) SetReadDeadline(t time.Time) error {
return errors.New("SCADA.Conn does not support deadlines")
}
func (s *scadaRWC) SetWriteDeadline(t time.Time) error {
return errors.New("SCADA.Conn does not support deadlines")
}

104
command/agent/scada_test.go Normal file
View File

@ -0,0 +1,104 @@
package agent
import (
"net"
"reflect"
"testing"
"github.com/hashicorp/scada-client"
)
func TestProviderService(t *testing.T) {
conf := DefaultConfig()
conf.Version = "0.5.0"
conf.VersionPrerelease = "rc1"
conf.AtlasJoin = true
conf.Server = true
ps := ProviderService(conf)
expect := &client.ProviderService{
Service: "consul",
ServiceVersion: "0.5.0rc1",
Capabilities: map[string]int{
"http": 1,
},
Meta: map[string]string{
"auto-join": "true",
"datacenter": "dc1",
"server": "true",
},
ResourceType: "infrastructures",
}
if !reflect.DeepEqual(ps, expect) {
t.Fatalf("bad: %v", ps)
}
}
func TestProviderConfig(t *testing.T) {
conf := DefaultConfig()
conf.Version = "0.5.0"
conf.VersionPrerelease = "rc1"
conf.AtlasJoin = true
conf.Server = true
conf.AtlasInfrastructure = "armon/test"
conf.AtlasToken = "foobarbaz"
pc := ProviderConfig(conf)
expect := &client.ProviderConfig{
Service: &client.ProviderService{
Service: "consul",
ServiceVersion: "0.5.0rc1",
Capabilities: map[string]int{
"http": 1,
},
Meta: map[string]string{
"auto-join": "true",
"datacenter": "dc1",
"server": "true",
},
ResourceType: "infrastructures",
},
Handlers: map[string]client.CapabilityProvider{
"http": nil,
},
ResourceGroup: "armon/test",
Token: "foobarbaz",
}
if !reflect.DeepEqual(pc, expect) {
t.Fatalf("bad: %v", pc)
}
}
func TestSCADAListener(t *testing.T) {
list := newScadaListener("armon/test")
defer list.Close()
var raw interface{} = list
_, ok := raw.(net.Listener)
if !ok {
t.Fatalf("bad")
}
a, b := net.Pipe()
defer a.Close()
defer b.Close()
go list.Push(a)
out, err := list.Accept()
if err != nil {
t.Fatalf("err: %v", err)
}
if out != a {
t.Fatalf("bad")
}
}
func TestSCADAAddr(t *testing.T) {
var addr interface{} = &scadaAddr{"armon/test"}
_, ok := addr.(net.Addr)
if !ok {
t.Fatalf("bad")
}
}

View File

@ -1,10 +1,11 @@
package agent
import (
"github.com/hashicorp/consul/consul/structs"
"net/http"
"sort"
"strings"
"github.com/hashicorp/consul/consul/structs"
)
// ServiceSummary is used to summarize a service
@ -19,99 +20,88 @@ type ServiceSummary struct {
// UINodes is used to list the nodes in a given datacenter. We return a
// NodeDump which provides overview information for all the nodes
func (s *HTTPServer) UINodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Get the datacenter
var dc string
s.parseDC(req, &dc)
// Try to ge ta node dump
var dump structs.NodeDump
if err := s.getNodeDump(resp, dc, "", &dump); err != nil {
return nil, err
// Parse arguments
args := structs.DCSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
return dump, nil
// Make the RPC request
var out structs.IndexedNodeDump
defer setMeta(resp, &out.QueryMeta)
RPC:
if err := s.agent.RPC("Internal.NodeDump", &args, &out); err != nil {
// Retry the request allowing stale data if no leader
if strings.Contains(err.Error(), structs.ErrNoLeader.Error()) && !args.AllowStale {
args.AllowStale = true
goto RPC
}
return nil, err
}
return out.Dump, nil
}
// UINodeInfo is used to get info on a single node in a given datacenter. We return a
// NodeInfo which provides overview information for the node
func (s *HTTPServer) UINodeInfo(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Get the datacenter
var dc string
s.parseDC(req, &dc)
// Parse arguments
args := structs.NodeSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
// Verify we have some DC, or use the default
node := strings.TrimPrefix(req.URL.Path, "/v1/internal/ui/node/")
if node == "" {
args.Node = strings.TrimPrefix(req.URL.Path, "/v1/internal/ui/node/")
if args.Node == "" {
resp.WriteHeader(400)
resp.Write([]byte("Missing node name"))
return nil, nil
}
// Try to get a node dump
var dump structs.NodeDump
if err := s.getNodeDump(resp, dc, node, &dump); err != nil {
// Make the RPC request
var out structs.IndexedNodeDump
defer setMeta(resp, &out.QueryMeta)
RPC:
if err := s.agent.RPC("Internal.NodeInfo", &args, &out); err != nil {
// Retry the request allowing stale data if no leader
if strings.Contains(err.Error(), structs.ErrNoLeader.Error()) && !args.AllowStale {
args.AllowStale = true
goto RPC
}
return nil, err
}
// Return only the first entry
if len(dump) > 0 {
return dump[0], nil
if len(out.Dump) > 0 {
return out.Dump[0], nil
}
return nil, nil
}
// getNodeDump is used to get a dump of all node data. We make a best effort by
// reading stale data in the case of an availability outage.
func (s *HTTPServer) getNodeDump(resp http.ResponseWriter, dc, node string, dump *structs.NodeDump) error {
var args interface{}
var method string
var allowStale *bool
if node == "" {
raw := structs.DCSpecificRequest{Datacenter: dc}
method = "Internal.NodeDump"
allowStale = &raw.AllowStale
args = &raw
} else {
raw := &structs.NodeSpecificRequest{Datacenter: dc, Node: node}
method = "Internal.NodeInfo"
allowStale = &raw.AllowStale
args = &raw
}
var out structs.IndexedNodeDump
defer setMeta(resp, &out.QueryMeta)
START:
if err := s.agent.RPC(method, args, &out); err != nil {
// Retry the request allowing stale data if no leader. The UI should continue
// to function even during an outage
if strings.Contains(err.Error(), structs.ErrNoLeader.Error()) && !*allowStale {
*allowStale = true
goto START
}
return err
}
// Set the result
*dump = out.Dump
return nil
}
// UIServices is used to list the services in a given datacenter. We return a
// ServiceSummary which provides overview information for the service
func (s *HTTPServer) UIServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Get the datacenter
var dc string
s.parseDC(req, &dc)
// Parse arguments
args := structs.DCSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
// Get the full node dump...
var dump structs.NodeDump
if err := s.getNodeDump(resp, dc, "", &dump); err != nil {
// Make the RPC request
var out structs.IndexedNodeDump
defer setMeta(resp, &out.QueryMeta)
RPC:
if err := s.agent.RPC("Internal.NodeDump", &args, &out); err != nil {
// Retry the request allowing stale data if no leader
if strings.Contains(err.Error(), structs.ErrNoLeader.Error()) && !args.AllowStale {
args.AllowStale = true
goto RPC
}
return nil, err
}
// Generate the summary
return summarizeServices(dump), nil
return summarizeServices(out.Dump), nil
}
func summarizeServices(dump structs.NodeDump) []*ServiceSummary {

View File

@ -39,6 +39,7 @@ $ consul agent -data-dir=/tmp/consul
Server: false (bootstrap: false)
Client Addr: 127.0.0.1 (HTTP: 8500, DNS: 8600, RPC: 8400)
Cluster Addr: 192.168.1.43 (LAN: 8301, WAN: 8302)
Atlas: (Infrastructure: 'hashicorp/test' Join: true)
==> Log data will now stream in as it occurs:
@ -75,6 +76,11 @@ There are several important messages that `consul agent` outputs:
Consul agents in a cluster. Not all Consul agents in a cluster have to
use the same port, but this address **MUST** be reachable by all other nodes.
* **Atlas**: This shows the [Atlas infrastructure](https://atlas.hashicorp.com)
the node is registered with. It also indicates if auto join is enabled.
The Atlas infrastructure is set using `-atlas` and auto-join is enabled by
setting `-atlas-join`.
## Stopping an Agent
An agent can be stopped in two ways: gracefully or forcefully. To gracefully

View File

@ -40,6 +40,17 @@ The options below are all specified on the command-line.
If this address is not routable, the node will be in a constant flapping state
as other nodes will treat the non-routability as a failure.
* <a id="atlas"></a>`-atlas` - This flag enables [Atlas](https://atlas.hashicorp.com) integration.
It is used to provide the Atlas infrastructure name and the SCADA connection.
This enables Atlas features such as the dashboard and node auto joining.
* <a id="atlas_join"></a>`-atlas-join` - When set, enables auto-join via Atlas. Atlas will track the most
recent members to join the infrastructure named by `-atlas` and automatically
join them on start. For servers, the LAN and WAN pool are both joined.
* <a id="atlas_token"></a>`-atlas-token` - Provides the Atlas API authentication token. This can also be provided
using the `ATLAS_TOKEN` environment variable. Required for use with Atlas.
* <a id="bootstrap_anchor"></a>`-bootstrap` - This flag is used to control if a server is in "bootstrap" mode. It is important that
no more than one server *per* data center be running in this mode. Technically, a server in bootstrap mode
is allowed to self-elect as the Raft leader. It is important that only a single node is in this mode;
@ -260,6 +271,16 @@ definitions support being updated during a reload.
* `advertise_addr` - Equivalent to the [`-advertise` command-line flag](#advertise).
* `atlas_acl_token` - When provided, any requests made by Atlas will use this ACL
token unless explicitly overriden. When not provided the `acl_token` is used.
This can be set to 'anonymous' to reduce permission below that of `acl_token`.
* `atlas_infrastructure` - Equivalent to the [`-atlas` command-line flag](#atlas).
* `atlas_join` - Equivalent to the [`-atlas-join` command-line flag](#atlas_join).
* `atlas_token` - Equivalent to the [`-atlas-token` command-line flag](#atlas_token).
* `bootstrap` - Equivalent to the [`-bootstrap` command-line flag](#bootstrap_anchor).
* `bootstrap_expect` - Equivalent to the [`-bootstrap-expect` command-line flag](#bootstrap_expect).

View File

@ -28,4 +28,14 @@ and can be disabled.
See [`disable_anonymous_signature`](/docs/agent/options.html#disable_anonymous_signature)
and [`disable_update_check`](/docs/agent/options.html#disable_update_check).
## Q: How does Atlas integration work?
Consul makes use of a HashiCorp service called [SCADA](http://scada.hashicorp.com)
which stands for Supervisory Control And Data Acquisition. The SCADA system allows
clients to maintain a long-running connection to Atlas which is used to make requests
to Consul agents for features like the dashboard and auto joining. Standard ACLs can
be applied to the SCADA connection, which has no enhanced or elevated privileges.
Using the SCADA service is optional and only enabled by opt-in.
See the [Atlas integration guide](/docs/guides/atlas.html).

View File

@ -0,0 +1,59 @@
---
layout: "docs"
page_title: "Atlas Integration"
sidebar_current: "docs-guides-atlas"
description: |-
This guide covers how to integrate Atlas with Consul to provide features like an infrastructure dashboard and automatic cluster joining.
---
# Atlas Integration
[Atlas](https://atlas.hashicorp.com) is service provided by HashiCorp to deploy applications and manage infrastructure.
Starting with Consul 0.5, it is possible to integrate Consul with Atlas. This is done by registering a node as part
of an Atlas infrastructure (specified with the `-atlas` flag). Consul maintains a long running connection to the
[SCADA](http://scada.hashicorp.com) service which allows Atlas to retrieve data and control nodes.
Data acquisition allows Atlas to display the state of the Consul cluster in its dashboard as well as enabling
alerts to be setup using health checks. Remote control enables Atlas to provide features like the auto joinining
nodes.
## Enabling Atlas Integration
To enable Atlas integration, you must specify the name of the Atlas infrastructure and the Atlas authentication
token. The Atlas infrastructure name can be set either with the `-atlas` CLI flag, or with the `atlas_infrastructure`
[configuration option](/docs/agent/options.html). The Atlas token is set with the `-atlas-token` CLI flag, `atlas_token`
configuration option, or `ATLAS_TOKEN` environment variable.
To verify the integration, either run the agent with `debug` level logging or use `consul monitor -log-level=debug`
and look for a line like:
[DEBUG] scada-client: assigned session '406ca55d-1801-f964-2942-45f5f9df3995'
This shows that the Consul agent was successfully able to register with the SCADA service.
## Using Auto-Join
Once integrated with Atlas, the auto join feature can be used to have nodes automatically join other
peers in their datacenter. Server nodes will automatically join peer LAN nodes and other WAN nodes.
Client nodes will only join other LAN nodes in their datacenter.
Auto join is enabled with the `-atlas-join` CLI flag or the `atlas_join` configuration option.
## Securing Atlas
The connection to Atlas does not have elevated privileges. API requests made by Atlas
are served in the same way any other HTTP request is made. If ACLs are enabled, it is possible to
force an Atlas ACL token to be used instead of the agent's default token.
When ACLs are enabled, the `atlas_acl_token` configuration option can be specified. This changes
the ACL token resolution order to be:
1. Request specific token provided by `?token=`. These tokens are set in the Atlas UI.
2. The `atlas_acl_token` if configured.
3. The `acl_token` if configured.
4. The `anonymous` token.
Because the `acl_token` typically has elevated permissions compared to the `anonymous` token,
the `atlas_acl_token` can be set to `anonymous` to drop privileges that would otherwise be
inherited from the agent.

View File

@ -14,6 +14,8 @@ guidance to do them safely.
The following guides are available:
* [Atlas Integration](/docs/guides/atlas.html) - This guide covers how to integrate [Atlas](https://atlas.hashicorp.com) with Consul.
* [Adding/Removing Servers](/docs/guides/servers.html) - This guide covers how to safely add and remove Consul servers from the cluster. This should be done carefully to avoid availability outages.
* [Bootstrapping](/docs/guides/bootstrapping.html) - This guide covers bootstrapping a new datacenter. This covers safely adding the initial Consul servers.

View File

@ -196,7 +196,11 @@
<li<%= sidebar_current("docs-guides") %>>
<a href="/docs/guides/index.html">Guides</a>
<ul class="nav">
<ul class="nav">
<li<%= sidebar_current("docs-guides-atlas") %>>
<a href="/docs/guides/atlas.html">Atlas Integration</a>
</li>
<li<%= sidebar_current("docs-guides-servers") %>>
<a href="/docs/guides/servers.html">Adding/Removing Servers</a>
</li>