diff --git a/api/api.go b/api/api.go index aa0060816..76acad1b6 100644 --- a/api/api.go +++ b/api/api.go @@ -289,6 +289,49 @@ func (c *Client) doRequest(r *request) (time.Duration, *http.Response, error) { return diff, resp, err } +// Query is used to do a GET request against an endpoint +// and deserialize the response into an interface using +// standard Consul conventions. +func (c *Client) query(endpoint string, out interface{}, q *QueryOptions) (*QueryMeta, error) { + r := c.newRequest("GET", endpoint) + r.setQueryOptions(q) + rtt, resp, err := requireOK(c.doRequest(r)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + qm := &QueryMeta{} + parseQueryMeta(resp, qm) + qm.RequestTime = rtt + + if err := decodeBody(resp, out); err != nil { + return nil, err + } + return qm, nil +} + +// write is used to do a PUT request against an endpoint +// and serialize/deserialized using the standard Consul conventions. +func (c *Client) write(endpoint string, in, out interface{}, q *WriteOptions) (*WriteMeta, error) { + r := c.newRequest("PUT", endpoint) + r.setWriteOptions(q) + r.obj = in + rtt, resp, err := requireOK(c.doRequest(r)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + wm := &WriteMeta{RequestTime: rtt} + if out != nil { + if err := decodeBody(resp, &out); err != nil { + return nil, err + } + } + return wm, nil +} + // parseQueryMeta is used to help parse query meta-data func parseQueryMeta(resp *http.Response, q *QueryMeta) error { header := resp.Header diff --git a/api/raw.go b/api/raw.go new file mode 100644 index 000000000..745a208c9 --- /dev/null +++ b/api/raw.go @@ -0,0 +1,24 @@ +package api + +// Raw can be used to do raw queries against custom endpoints +type Raw struct { + c *Client +} + +// Raw returns a handle to query endpoints +func (c *Client) Raw() *Raw { + return &Raw{c} +} + +// Query is used to do a GET request against an endpoint +// and deserialize the response into an interface using +// standard Consul conventions. +func (raw *Raw) Query(endpoint string, out interface{}, q *QueryOptions) (*QueryMeta, error) { + return raw.c.query(endpoint, out, q) +} + +// Write is used to do a PUT request against an endpoint +// and serialize/deserialized using the standard Consul conventions. +func (raw *Raw) Write(endpoint string, in, out interface{}, q *WriteOptions) (*WriteMeta, error) { + return raw.c.write(endpoint, in, out, q) +} diff --git a/api/session.go b/api/session.go index bb84644fd..63baa90e9 100644 --- a/api/session.go +++ b/api/session.go @@ -93,18 +93,9 @@ func (s *Session) Create(se *SessionEntry, q *WriteOptions) (string, *WriteMeta, } func (s *Session) create(obj interface{}, q *WriteOptions) (string, *WriteMeta, error) { - r := s.c.newRequest("PUT", "/v1/session/create") - r.setWriteOptions(q) - r.obj = obj - rtt, resp, err := requireOK(s.c.doRequest(r)) - if err != nil { - return "", nil, err - } - defer resp.Body.Close() - - wm := &WriteMeta{RequestTime: rtt} var out struct{ ID string } - if err := decodeBody(resp, &out); err != nil { + wm, err := s.c.write("/v1/session/create", obj, &out, q) + if err != nil { return "", nil, err } return out.ID, wm, nil @@ -112,35 +103,20 @@ func (s *Session) create(obj interface{}, q *WriteOptions) (string, *WriteMeta, // Destroy invalides a given session func (s *Session) Destroy(id string, q *WriteOptions) (*WriteMeta, error) { - r := s.c.newRequest("PUT", "/v1/session/destroy/"+id) - r.setWriteOptions(q) - rtt, resp, err := requireOK(s.c.doRequest(r)) + wm, err := s.c.write("/v1/session/destroy/"+id, nil, nil, q) if err != nil { return nil, err } - resp.Body.Close() - - wm := &WriteMeta{RequestTime: rtt} return wm, nil } // Renew renews the TTL on a given session func (s *Session) Renew(id string, q *WriteOptions) (*SessionEntry, *WriteMeta, error) { - r := s.c.newRequest("PUT", "/v1/session/renew/"+id) - r.setWriteOptions(q) - rtt, resp, err := requireOK(s.c.doRequest(r)) + var entries []*SessionEntry + wm, err := s.c.write("/v1/session/renew/"+id, nil, &entries, q) if err != nil { return nil, nil, err } - defer resp.Body.Close() - - wm := &WriteMeta{RequestTime: rtt} - - var entries []*SessionEntry - if err := decodeBody(resp, &entries); err != nil { - return nil, wm, err - } - if len(entries) > 0 { return entries[0], wm, nil } @@ -179,23 +155,11 @@ func (s *Session) RenewPeriodic(initialTTL string, id string, q *WriteOptions, d // Info looks up a single session func (s *Session) Info(id string, q *QueryOptions) (*SessionEntry, *QueryMeta, error) { - r := s.c.newRequest("GET", "/v1/session/info/"+id) - r.setQueryOptions(q) - rtt, resp, err := requireOK(s.c.doRequest(r)) + var entries []*SessionEntry + qm, err := s.c.query("/v1/session/info/"+id, &entries, q) if err != nil { return nil, nil, err } - defer resp.Body.Close() - - qm := &QueryMeta{} - parseQueryMeta(resp, qm) - qm.RequestTime = rtt - - var entries []*SessionEntry - if err := decodeBody(resp, &entries); err != nil { - return nil, nil, err - } - if len(entries) > 0 { return entries[0], qm, nil } @@ -204,20 +168,9 @@ func (s *Session) Info(id string, q *QueryOptions) (*SessionEntry, *QueryMeta, e // List gets sessions for a node func (s *Session) Node(node string, q *QueryOptions) ([]*SessionEntry, *QueryMeta, error) { - r := s.c.newRequest("GET", "/v1/session/node/"+node) - r.setQueryOptions(q) - rtt, resp, err := requireOK(s.c.doRequest(r)) - if err != nil { - return nil, nil, err - } - defer resp.Body.Close() - - qm := &QueryMeta{} - parseQueryMeta(resp, qm) - qm.RequestTime = rtt - var entries []*SessionEntry - if err := decodeBody(resp, &entries); err != nil { + qm, err := s.c.query("/v1/session/node/"+node, &entries, q) + if err != nil { return nil, nil, err } return entries, qm, nil @@ -225,20 +178,9 @@ func (s *Session) Node(node string, q *QueryOptions) ([]*SessionEntry, *QueryMet // List gets all active sessions func (s *Session) List(q *QueryOptions) ([]*SessionEntry, *QueryMeta, error) { - r := s.c.newRequest("GET", "/v1/session/list") - r.setQueryOptions(q) - rtt, resp, err := requireOK(s.c.doRequest(r)) - if err != nil { - return nil, nil, err - } - defer resp.Body.Close() - - qm := &QueryMeta{} - parseQueryMeta(resp, qm) - qm.RequestTime = rtt - var entries []*SessionEntry - if err := decodeBody(resp, &entries); err != nil { + qm, err := s.c.query("/v1/session/list", &entries, q) + if err != nil { return nil, nil, err } return entries, qm, nil diff --git a/command/agent/command.go b/command/agent/command.go index e0ebfbfc7..b1da6751e 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/go-checkpoint" "github.com/hashicorp/go-syslog" "github.com/hashicorp/logutils" + scada "github.com/hashicorp/scada-client" "github.com/mitchellh/cli" ) @@ -45,6 +46,7 @@ type Command struct { rpcServer *AgentRPC httpServers []*HTTPServer dnsServer *DNSServer + scadaProvider *scada.Provider } // readConfig is responsible for setup of our configuration using @@ -76,6 +78,10 @@ func (c *Command) readConfig() *Config { cmdFlags.StringVar(&cmdConfig.BindAddr, "bind", "", "address to bind server listeners to") cmdFlags.StringVar(&cmdConfig.AdvertiseAddr, "advertise", "", "address to advertise instead of bind addr") + cmdFlags.StringVar(&cmdConfig.AtlasInfrastructure, "atlas", "", "infrastructure name in Atlas") + cmdFlags.StringVar(&cmdConfig.AtlasToken, "atlas-token", "", "authentication token for Atlas") + cmdFlags.BoolVar(&cmdConfig.AtlasJoin, "atlas-join", false, "auto-join with Atlas") + cmdFlags.IntVar(&cmdConfig.Protocol, "protocol", -1, "protocol version") cmdFlags.BoolVar(&cmdConfig.EnableSyslog, "syslog", false, @@ -327,8 +333,21 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log c.Ui.Output("Starting Consul agent RPC...") c.rpcServer = NewAgentRPC(agent, rpcListener, logOutput, logWriter) - if config.Ports.HTTP > 0 || config.Ports.HTTPS > 0 { - servers, err := NewHTTPServers(agent, config, logOutput) + // Enable the SCADA integration + var scadaList net.Listener + if config.AtlasInfrastructure != "" { + provider, list, err := NewProvider(config, logOutput) + if err != nil { + agent.Shutdown() + c.Ui.Error(fmt.Sprintf("Error starting SCADA connection: %s", err)) + return err + } + c.scadaProvider = provider + scadaList = list + } + + if config.Ports.HTTP > 0 || config.Ports.HTTPS > 0 || scadaList != nil { + servers, err := NewHTTPServers(agent, config, scadaList, logOutput) if err != nil { agent.Shutdown() c.Ui.Error(fmt.Sprintf("Error starting http servers: %s", err)) @@ -378,7 +397,6 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log c.checkpointResults(checkpoint.Check(updateParams)) }() } - return nil } @@ -586,10 +604,12 @@ func (c *Command) Run(args []string) int { if c.dnsServer != nil { defer c.dnsServer.Shutdown() } - for _, server := range c.httpServers { defer server.Shutdown() } + if c.scadaProvider != nil { + defer c.scadaProvider.Shutdown() + } // Join startup nodes if specified if err := c.startupJoin(config); err != nil { @@ -628,6 +648,12 @@ func (c *Command) Run(args []string) int { gossipEncrypted = c.agent.client.Encrypted() } + // Determine the Atlas cluster + atlas := "" + if config.AtlasInfrastructure != "" { + atlas = fmt.Sprintf("(Infrastructure: '%s' Join: %v)", config.AtlasInfrastructure, config.AtlasJoin) + } + // Let the agent know we've finished registration c.agent.StartSync() @@ -641,6 +667,7 @@ func (c *Command) Run(args []string) int { config.Ports.SerfLan, config.Ports.SerfWan)) c.Ui.Info(fmt.Sprintf("Gossip encrypt: %v, RPC-TLS: %v, TLS-Incoming: %v", gossipEncrypted, config.VerifyOutgoing, config.VerifyIncoming)) + c.Ui.Info(fmt.Sprintf(" Atlas: %s", atlas)) // Enable log streaming c.Ui.Info("") @@ -815,6 +842,9 @@ Usage: consul agent [options] Options: -advertise=addr Sets the advertise address to use + -atlas=org/name Sets the Atlas infrastructure name, enables SCADA. + -atlas-join Enables auto-joining the Atlas cluster + -atlas-token=token Provides the Atlas API token -bootstrap Sets server to bootstrap mode -bind=0.0.0.0 Sets the bind address for cluster communication -bootstrap-expect=0 Sets server to expect bootstrap mode. diff --git a/command/agent/config.go b/command/agent/config.go index 6f457aa05..bd849d5fd 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -318,6 +318,23 @@ type Config struct { // HTTPAPIResponseHeaders are used to add HTTP header response fields to the HTTP API responses. HTTPAPIResponseHeaders map[string]string `mapstructure:"http_api_response_headers"` + // AtlasInfrastructure is the name of the infrastructure we belong to. e.g. hashicorp/stage + AtlasInfrastructure string `mapstructure:"atlas_infrastructure"` + + // AtlasToken is our authentication token from Atlas + AtlasToken string `mapstructure:"atlas_token" json:"-"` + + // AtlasACLToken is applied to inbound requests if no other token + // is provided. This takes higher precedence than the ACLToken. + // Without this, the ACLToken is used. If that is not specified either, + // then the 'anonymous' token is used. This can be set to 'anonymous' + // to reduce the Atlas privileges to below that of the ACLToken. + AtlasACLToken string `mapstructure:"atlas_acl_token" json:"-"` + + // AtlasJoin controls if Atlas will attempt to auto-join the node + // to it's cluster. Requires Atlas integration. + AtlasJoin bool `mapstructure:"atlas_join"` + // AEInterval controls the anti-entropy interval. This is how often // the agent attempts to reconcile it's local state with the server' // representation of our state. Defaults to every 60s. @@ -941,6 +958,18 @@ func MergeConfig(a, b *Config) *Config { if b.UnixSockets.Perms != "" { result.UnixSockets.Perms = b.UnixSockets.Perms } + if b.AtlasInfrastructure != "" { + result.AtlasInfrastructure = b.AtlasInfrastructure + } + if b.AtlasToken != "" { + result.AtlasToken = b.AtlasToken + } + if b.AtlasACLToken != "" { + result.AtlasACLToken = b.AtlasACLToken + } + if b.AtlasJoin { + result.AtlasJoin = true + } if len(b.HTTPAPIResponseHeaders) != 0 { if result.HTTPAPIResponseHeaders == nil { diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 699f8de88..b6cdac205 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -633,6 +633,26 @@ func TestDecodeConfig(t *testing.T) { if config.HTTPAPIResponseHeaders["X-XSS-Protection"] != "1; mode=block" { t.Fatalf("bad: %#v", config) } + + // Atlas configs + input = `{"atlas_infrastructure": "hashicorp/prod", "atlas_token": "abcdefg", "atlas_acl_token": "123456789", "atlas_join": true}` + config, err = DecodeConfig(bytes.NewReader([]byte(input))) + if err != nil { + t.Fatalf("err: %s", err) + } + + if config.AtlasInfrastructure != "hashicorp/prod" { + t.Fatalf("bad: %#v", config) + } + if config.AtlasToken != "abcdefg" { + t.Fatalf("bad: %#v", config) + } + if config.AtlasACLToken != "123456789" { + t.Fatalf("bad: %#v", config) + } + if !config.AtlasJoin { + t.Fatalf("bad: %#v", config) + } } func TestDecodeConfig_invalidKeys(t *testing.T) { @@ -1096,6 +1116,10 @@ func TestMergeConfig(t *testing.T) { Perms: "0700", }, }, + AtlasInfrastructure: "hashicorp/prod", + AtlasToken: "123456789", + AtlasACLToken: "abcdefgh", + AtlasJoin: true, } c := MergeConfig(a, b) diff --git a/command/agent/http.go b/command/agent/http.go index 91ae3a1d0..52e75e6ee 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -19,6 +19,14 @@ import ( "github.com/mitchellh/mapstructure" ) +var ( + // scadaHTTPAddr is the address associated with the + // HTTPServer. When populating an ACL token for a request, + // this is checked to switch between the ACLToken and + // AtlasACLToken + scadaHTTPAddr = "SCADA" +) + // HTTPServer is used to wrap an Agent and expose various API's // in a RESTful manner type HTTPServer struct { @@ -32,15 +40,11 @@ type HTTPServer struct { // NewHTTPServers starts new HTTP servers to provide an interface to // the agent. -func NewHTTPServers(agent *Agent, config *Config, logOutput io.Writer) ([]*HTTPServer, error) { - var tlsConfig *tls.Config - var list net.Listener - var httpAddr net.Addr - var err error +func NewHTTPServers(agent *Agent, config *Config, scada net.Listener, logOutput io.Writer) ([]*HTTPServer, error) { var servers []*HTTPServer if config.Ports.HTTPS > 0 { - httpAddr, err = config.ClientListener(config.Addresses.HTTPS, config.Ports.HTTPS) + httpAddr, err := config.ClientListener(config.Addresses.HTTPS, config.Ports.HTTPS) if err != nil { return nil, err } @@ -54,7 +58,7 @@ func NewHTTPServers(agent *Agent, config *Config, logOutput io.Writer) ([]*HTTPS NodeName: config.NodeName, ServerName: config.ServerName} - tlsConfig, err = tlsConf.IncomingTLSConfig() + tlsConfig, err := tlsConf.IncomingTLSConfig() if err != nil { return nil, err } @@ -64,7 +68,7 @@ func NewHTTPServers(agent *Agent, config *Config, logOutput io.Writer) ([]*HTTPS return nil, fmt.Errorf("Failed to get Listen on %s: %v", httpAddr.String(), err) } - list = tls.NewListener(tcpKeepAliveListener{ln.(*net.TCPListener)}, tlsConfig) + list := tls.NewListener(tcpKeepAliveListener{ln.(*net.TCPListener)}, tlsConfig) // Create the mux mux := http.NewServeMux() @@ -86,7 +90,7 @@ func NewHTTPServers(agent *Agent, config *Config, logOutput io.Writer) ([]*HTTPS } if config.Ports.HTTP > 0 { - httpAddr, err = config.ClientListener(config.Addresses.HTTP, config.Ports.HTTP) + httpAddr, err := config.ClientListener(config.Addresses.HTTP, config.Ports.HTTP) if err != nil { return nil, fmt.Errorf("Failed to get ClientListener address:port: %v", err) } @@ -107,6 +111,7 @@ func NewHTTPServers(agent *Agent, config *Config, logOutput io.Writer) ([]*HTTPS return nil, fmt.Errorf("Failed to get Listen on %s: %v", httpAddr.String(), err) } + var list net.Listener if isSocket { // Set up ownership/permission bits on the socket file if err := setFilePermissions(socketPath, config.UnixSockets); err != nil { @@ -136,6 +141,26 @@ func NewHTTPServers(agent *Agent, config *Config, logOutput io.Writer) ([]*HTTPS servers = append(servers, srv) } + if scada != nil { + // Create the mux + mux := http.NewServeMux() + + // Create the server + srv := &HTTPServer{ + agent: agent, + mux: mux, + listener: scada, + logger: log.New(logOutput, "", log.LstdFlags), + uiDir: config.UiDir, + addr: scadaHTTPAddr, + } + srv.registerHandlers(false) // Never allow debug for SCADA + + // Start the server + go http.Serve(scada, mux) + servers = append(servers, srv) + } + return servers, nil } @@ -159,7 +184,7 @@ func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { // Shutdown is used to shutdown the HTTP server func (s *HTTPServer) Shutdown() { if s != nil { - s.logger.Printf("[DEBUG] http: Shutting down http server(%v)", s.addr) + s.logger.Printf("[DEBUG] http: Shutting down http server (%v)", s.addr) s.listener.Close() } } @@ -241,7 +266,10 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { if s.uiDir != "" { // Static file serving done from /ui/ s.mux.Handle("/ui/", http.StripPrefix("/ui/", http.FileServer(http.Dir(s.uiDir)))) + } + // Enable the special endpoints for UI or SCADA + if s.uiDir != "" || s.agent.config.AtlasInfrastructure != "" { // API's are under /internal/ui/ to avoid conflict s.mux.HandleFunc("/v1/internal/ui/nodes", s.wrap(s.UINodes)) s.mux.HandleFunc("/v1/internal/ui/node/", s.wrap(s.UINodeInfo)) @@ -422,9 +450,17 @@ func (s *HTTPServer) parseDC(req *http.Request, dc *string) { func (s *HTTPServer) parseToken(req *http.Request, token *string) { if other := req.URL.Query().Get("token"); other != "" { *token = other - } else if *token == "" { - *token = s.agent.config.ACLToken + return } + + // Set the AtlasACLToken if SCADA + if s.addr == scadaHTTPAddr && s.agent.config.AtlasACLToken != "" { + *token = s.agent.config.AtlasACLToken + return + } + + // Set the default ACLToken + *token = s.agent.config.ACLToken } // parse is a convenience method for endpoints that need diff --git a/command/agent/http_test.go b/command/agent/http_test.go index 19e8f95af..a6ec471d5 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -36,7 +36,7 @@ func makeHTTPServerWithConfig(t *testing.T, cb func(c *Config)) (string, *HTTPSe t.Fatalf("err: %v", err) } conf.UiDir = uiDir - servers, err := NewHTTPServers(agent, conf, agent.logOutput) + servers, err := NewHTTPServers(agent, conf, nil, agent.logOutput) if err != nil { t.Fatalf("err: %v", err) } @@ -146,7 +146,7 @@ func TestHTTPServer_UnixSocket_FileExists(t *testing.T) { defer os.RemoveAll(dir) // Try to start the server with the same path anyways. - if _, err := NewHTTPServers(agent, conf, agent.logOutput); err != nil { + if _, err := NewHTTPServers(agent, conf, nil, agent.logOutput); err != nil { t.Fatalf("err: %s", err) } @@ -429,6 +429,67 @@ func TestParseConsistency_Invalid(t *testing.T) { } } +// Test ACL token is resolved in correct order +func TestACLResolution(t *testing.T) { + var token string + // Request without token + req, err := http.NewRequest("GET", + "/v1/catalog/nodes", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Request with explicit token + reqToken, err := http.NewRequest("GET", + "/v1/catalog/nodes?token=foo", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + httpTest(t, func(srv *HTTPServer) { + // Check when no token is set + srv.agent.config.ACLToken = "" + srv.parseToken(req, &token) + if token != "" { + t.Fatalf("bad: %s", token) + } + + // Check when ACLToken set + srv.agent.config.ACLToken = "agent" + srv.parseToken(req, &token) + if token != "agent" { + t.Fatalf("bad: %s", token) + } + + // Check when AtlasACLToken set, wrong server + srv.agent.config.AtlasACLToken = "atlas" + srv.parseToken(req, &token) + if token != "agent" { + t.Fatalf("bad: %s", token) + } + + // Check when AtlasACLToken set, correct server + srv.addr = scadaHTTPAddr + srv.parseToken(req, &token) + if token != "atlas" { + t.Fatalf("bad: %s", token) + } + + // Check when AtlasACLToken not, correct server + srv.agent.config.AtlasACLToken = "" + srv.parseToken(req, &token) + if token != "agent" { + t.Fatalf("bad: %s", token) + } + + // Explicit token has highest precedence + srv.parseToken(reqToken, &token) + if token != "foo" { + t.Fatalf("bad: %s", token) + } + }) +} + // assertIndex tests that X-Consul-Index is set and non-zero func assertIndex(t *testing.T, resp *httptest.ResponseRecorder) { header := resp.Header().Get("X-Consul-Index") diff --git a/command/agent/scada.go b/command/agent/scada.go new file mode 100644 index 000000000..2fd3d543d --- /dev/null +++ b/command/agent/scada.go @@ -0,0 +1,192 @@ +package agent + +import ( + "crypto/tls" + "errors" + "fmt" + "io" + "net" + "os" + "strconv" + "sync" + "time" + + "github.com/hashicorp/scada-client" +) + +const ( + // providerService is the service name we use + providerService = "consul" + + // resourceType is the type of resource we represent + // when connecting to SCADA + resourceType = "infrastructures" +) + +// ProviderService returns the service information for the provider +func ProviderService(c *Config) *client.ProviderService { + return &client.ProviderService{ + Service: providerService, + ServiceVersion: fmt.Sprintf("%s%s", c.Version, c.VersionPrerelease), + Capabilities: map[string]int{ + "http": 1, + }, + Meta: map[string]string{ + "auto-join": strconv.FormatBool(c.AtlasJoin), + "datacenter": c.Datacenter, + "server": strconv.FormatBool(c.Server), + }, + ResourceType: resourceType, + } +} + +// ProviderConfig returns the configuration for the SCADA provider +func ProviderConfig(c *Config) *client.ProviderConfig { + return &client.ProviderConfig{ + Service: ProviderService(c), + Handlers: map[string]client.CapabilityProvider{ + "http": nil, + }, + ResourceGroup: c.AtlasInfrastructure, + Token: c.AtlasToken, + } +} + +// NewProvider creates a new SCADA provider using the +// given configuration. Requests for the HTTP capability +// are passed off to the listener that is returned. +func NewProvider(c *Config, logOutput io.Writer) (*client.Provider, net.Listener, error) { + // Get the configuration of the provider + config := ProviderConfig(c) + config.LogOutput = logOutput + + // SCADA_INSECURE env variable is used for testing to disable + // TLS certificate verification. + if os.Getenv("SCADA_INSECURE") != "" { + config.TLSConfig = &tls.Config{ + InsecureSkipVerify: true, + } + } + + // Create an HTTP listener and handler + list := newScadaListener(c.AtlasInfrastructure) + config.Handlers["http"] = func(capability string, meta map[string]string, + conn io.ReadWriteCloser) error { + return list.PushRWC(conn) + } + + // Create the provider + provider, err := client.NewProvider(config) + if err != nil { + list.Close() + return nil, nil, err + } + return provider, list, nil +} + +// scadaListener is used to return a net.Listener for +// incoming SCADA connections +type scadaListener struct { + addr *scadaAddr + pending chan net.Conn + + closed bool + closedCh chan struct{} + l sync.Mutex +} + +// newScadaListener returns a new listener +func newScadaListener(infra string) *scadaListener { + l := &scadaListener{ + addr: &scadaAddr{infra}, + pending: make(chan net.Conn), + closedCh: make(chan struct{}), + } + return l +} + +// PushRWC is used to push a io.ReadWriteCloser as a net.Conn +func (s *scadaListener) PushRWC(conn io.ReadWriteCloser) error { + // Check if this already implements net.Conn + if nc, ok := conn.(net.Conn); ok { + return s.Push(nc) + } + + // Wrap to implement the interface + wrapped := &scadaRWC{conn, s.addr} + return s.Push(wrapped) +} + +// Push is used to add a connection to the queu +func (s *scadaListener) Push(conn net.Conn) error { + select { + case s.pending <- conn: + return nil + case <-time.After(time.Second): + return fmt.Errorf("accept timed out") + case <-s.closedCh: + return fmt.Errorf("scada listener closed") + } +} + +func (s *scadaListener) Accept() (net.Conn, error) { + select { + case conn := <-s.pending: + return conn, nil + case <-s.closedCh: + return nil, fmt.Errorf("scada listener closed") + } +} + +func (s *scadaListener) Close() error { + s.l.Lock() + defer s.l.Unlock() + if s.closed { + return nil + } + s.closed = true + close(s.closedCh) + return nil +} + +func (s *scadaListener) Addr() net.Addr { + return s.addr +} + +// scadaAddr is used to return a net.Addr for SCADA +type scadaAddr struct { + infra string +} + +func (s *scadaAddr) Network() string { + return "SCADA" +} + +func (s *scadaAddr) String() string { + return fmt.Sprintf("SCADA::Atlas::%s", s.infra) +} + +type scadaRWC struct { + io.ReadWriteCloser + addr *scadaAddr +} + +func (s *scadaRWC) LocalAddr() net.Addr { + return s.addr +} + +func (s *scadaRWC) RemoteAddr() net.Addr { + return s.addr +} + +func (s *scadaRWC) SetDeadline(t time.Time) error { + return errors.New("SCADA.Conn does not support deadlines") +} + +func (s *scadaRWC) SetReadDeadline(t time.Time) error { + return errors.New("SCADA.Conn does not support deadlines") +} + +func (s *scadaRWC) SetWriteDeadline(t time.Time) error { + return errors.New("SCADA.Conn does not support deadlines") +} diff --git a/command/agent/scada_test.go b/command/agent/scada_test.go new file mode 100644 index 000000000..e142f54ae --- /dev/null +++ b/command/agent/scada_test.go @@ -0,0 +1,104 @@ +package agent + +import ( + "net" + "reflect" + "testing" + + "github.com/hashicorp/scada-client" +) + +func TestProviderService(t *testing.T) { + conf := DefaultConfig() + conf.Version = "0.5.0" + conf.VersionPrerelease = "rc1" + conf.AtlasJoin = true + conf.Server = true + ps := ProviderService(conf) + + expect := &client.ProviderService{ + Service: "consul", + ServiceVersion: "0.5.0rc1", + Capabilities: map[string]int{ + "http": 1, + }, + Meta: map[string]string{ + "auto-join": "true", + "datacenter": "dc1", + "server": "true", + }, + ResourceType: "infrastructures", + } + + if !reflect.DeepEqual(ps, expect) { + t.Fatalf("bad: %v", ps) + } +} + +func TestProviderConfig(t *testing.T) { + conf := DefaultConfig() + conf.Version = "0.5.0" + conf.VersionPrerelease = "rc1" + conf.AtlasJoin = true + conf.Server = true + conf.AtlasInfrastructure = "armon/test" + conf.AtlasToken = "foobarbaz" + pc := ProviderConfig(conf) + + expect := &client.ProviderConfig{ + Service: &client.ProviderService{ + Service: "consul", + ServiceVersion: "0.5.0rc1", + Capabilities: map[string]int{ + "http": 1, + }, + Meta: map[string]string{ + "auto-join": "true", + "datacenter": "dc1", + "server": "true", + }, + ResourceType: "infrastructures", + }, + Handlers: map[string]client.CapabilityProvider{ + "http": nil, + }, + ResourceGroup: "armon/test", + Token: "foobarbaz", + } + + if !reflect.DeepEqual(pc, expect) { + t.Fatalf("bad: %v", pc) + } +} + +func TestSCADAListener(t *testing.T) { + list := newScadaListener("armon/test") + defer list.Close() + + var raw interface{} = list + _, ok := raw.(net.Listener) + if !ok { + t.Fatalf("bad") + } + + a, b := net.Pipe() + defer a.Close() + defer b.Close() + + go list.Push(a) + out, err := list.Accept() + if err != nil { + t.Fatalf("err: %v", err) + } + if out != a { + t.Fatalf("bad") + } +} + +func TestSCADAAddr(t *testing.T) { + var addr interface{} = &scadaAddr{"armon/test"} + _, ok := addr.(net.Addr) + if !ok { + t.Fatalf("bad") + } +} diff --git a/command/agent/ui_endpoint.go b/command/agent/ui_endpoint.go index f9eb72a57..425e392ce 100644 --- a/command/agent/ui_endpoint.go +++ b/command/agent/ui_endpoint.go @@ -1,10 +1,11 @@ package agent import ( - "github.com/hashicorp/consul/consul/structs" "net/http" "sort" "strings" + + "github.com/hashicorp/consul/consul/structs" ) // ServiceSummary is used to summarize a service @@ -19,99 +20,88 @@ type ServiceSummary struct { // UINodes is used to list the nodes in a given datacenter. We return a // NodeDump which provides overview information for all the nodes func (s *HTTPServer) UINodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - // Get the datacenter - var dc string - s.parseDC(req, &dc) - - // Try to ge ta node dump - var dump structs.NodeDump - if err := s.getNodeDump(resp, dc, "", &dump); err != nil { - return nil, err + // Parse arguments + args := structs.DCSpecificRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil } - return dump, nil + // Make the RPC request + var out structs.IndexedNodeDump + defer setMeta(resp, &out.QueryMeta) +RPC: + if err := s.agent.RPC("Internal.NodeDump", &args, &out); err != nil { + // Retry the request allowing stale data if no leader + if strings.Contains(err.Error(), structs.ErrNoLeader.Error()) && !args.AllowStale { + args.AllowStale = true + goto RPC + } + return nil, err + } + return out.Dump, nil } // UINodeInfo is used to get info on a single node in a given datacenter. We return a // NodeInfo which provides overview information for the node func (s *HTTPServer) UINodeInfo(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - // Get the datacenter - var dc string - s.parseDC(req, &dc) + // Parse arguments + args := structs.NodeSpecificRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil + } // Verify we have some DC, or use the default - node := strings.TrimPrefix(req.URL.Path, "/v1/internal/ui/node/") - if node == "" { + args.Node = strings.TrimPrefix(req.URL.Path, "/v1/internal/ui/node/") + if args.Node == "" { resp.WriteHeader(400) resp.Write([]byte("Missing node name")) return nil, nil } - // Try to get a node dump - var dump structs.NodeDump - if err := s.getNodeDump(resp, dc, node, &dump); err != nil { + // Make the RPC request + var out structs.IndexedNodeDump + defer setMeta(resp, &out.QueryMeta) +RPC: + if err := s.agent.RPC("Internal.NodeInfo", &args, &out); err != nil { + // Retry the request allowing stale data if no leader + if strings.Contains(err.Error(), structs.ErrNoLeader.Error()) && !args.AllowStale { + args.AllowStale = true + goto RPC + } return nil, err } // Return only the first entry - if len(dump) > 0 { - return dump[0], nil + if len(out.Dump) > 0 { + return out.Dump[0], nil } return nil, nil } -// getNodeDump is used to get a dump of all node data. We make a best effort by -// reading stale data in the case of an availability outage. -func (s *HTTPServer) getNodeDump(resp http.ResponseWriter, dc, node string, dump *structs.NodeDump) error { - var args interface{} - var method string - var allowStale *bool - - if node == "" { - raw := structs.DCSpecificRequest{Datacenter: dc} - method = "Internal.NodeDump" - allowStale = &raw.AllowStale - args = &raw - } else { - raw := &structs.NodeSpecificRequest{Datacenter: dc, Node: node} - method = "Internal.NodeInfo" - allowStale = &raw.AllowStale - args = &raw - } - var out structs.IndexedNodeDump - defer setMeta(resp, &out.QueryMeta) - -START: - if err := s.agent.RPC(method, args, &out); err != nil { - // Retry the request allowing stale data if no leader. The UI should continue - // to function even during an outage - if strings.Contains(err.Error(), structs.ErrNoLeader.Error()) && !*allowStale { - *allowStale = true - goto START - } - return err - } - - // Set the result - *dump = out.Dump - return nil -} - // UIServices is used to list the services in a given datacenter. We return a // ServiceSummary which provides overview information for the service func (s *HTTPServer) UIServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - // Get the datacenter - var dc string - s.parseDC(req, &dc) + // Parse arguments + args := structs.DCSpecificRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil + } - // Get the full node dump... - var dump structs.NodeDump - if err := s.getNodeDump(resp, dc, "", &dump); err != nil { + // Make the RPC request + var out structs.IndexedNodeDump + defer setMeta(resp, &out.QueryMeta) +RPC: + if err := s.agent.RPC("Internal.NodeDump", &args, &out); err != nil { + // Retry the request allowing stale data if no leader + if strings.Contains(err.Error(), structs.ErrNoLeader.Error()) && !args.AllowStale { + args.AllowStale = true + goto RPC + } return nil, err } // Generate the summary - return summarizeServices(dump), nil + return summarizeServices(out.Dump), nil } func summarizeServices(dump structs.NodeDump) []*ServiceSummary { diff --git a/website/source/docs/agent/basics.html.markdown b/website/source/docs/agent/basics.html.markdown index 3aff65da9..d520b5533 100644 --- a/website/source/docs/agent/basics.html.markdown +++ b/website/source/docs/agent/basics.html.markdown @@ -39,6 +39,7 @@ $ consul agent -data-dir=/tmp/consul Server: false (bootstrap: false) Client Addr: 127.0.0.1 (HTTP: 8500, DNS: 8600, RPC: 8400) Cluster Addr: 192.168.1.43 (LAN: 8301, WAN: 8302) + Atlas: (Infrastructure: 'hashicorp/test' Join: true) ==> Log data will now stream in as it occurs: @@ -75,6 +76,11 @@ There are several important messages that `consul agent` outputs: Consul agents in a cluster. Not all Consul agents in a cluster have to use the same port, but this address **MUST** be reachable by all other nodes. +* **Atlas**: This shows the [Atlas infrastructure](https://atlas.hashicorp.com) + the node is registered with. It also indicates if auto join is enabled. + The Atlas infrastructure is set using `-atlas` and auto-join is enabled by + setting `-atlas-join`. + ## Stopping an Agent An agent can be stopped in two ways: gracefully or forcefully. To gracefully diff --git a/website/source/docs/agent/options.html.markdown b/website/source/docs/agent/options.html.markdown index 43f9dc55b..0e0f132d8 100644 --- a/website/source/docs/agent/options.html.markdown +++ b/website/source/docs/agent/options.html.markdown @@ -40,6 +40,17 @@ The options below are all specified on the command-line. If this address is not routable, the node will be in a constant flapping state as other nodes will treat the non-routability as a failure. +* `-atlas` - This flag enables [Atlas](https://atlas.hashicorp.com) integration. + It is used to provide the Atlas infrastructure name and the SCADA connection. + This enables Atlas features such as the dashboard and node auto joining. + +* `-atlas-join` - When set, enables auto-join via Atlas. Atlas will track the most + recent members to join the infrastructure named by `-atlas` and automatically + join them on start. For servers, the LAN and WAN pool are both joined. + +* `-atlas-token` - Provides the Atlas API authentication token. This can also be provided + using the `ATLAS_TOKEN` environment variable. Required for use with Atlas. + * `-bootstrap` - This flag is used to control if a server is in "bootstrap" mode. It is important that no more than one server *per* data center be running in this mode. Technically, a server in bootstrap mode is allowed to self-elect as the Raft leader. It is important that only a single node is in this mode; @@ -260,6 +271,16 @@ definitions support being updated during a reload. * `advertise_addr` - Equivalent to the [`-advertise` command-line flag](#advertise). +* `atlas_acl_token` - When provided, any requests made by Atlas will use this ACL + token unless explicitly overriden. When not provided the `acl_token` is used. + This can be set to 'anonymous' to reduce permission below that of `acl_token`. + +* `atlas_infrastructure` - Equivalent to the [`-atlas` command-line flag](#atlas). + +* `atlas_join` - Equivalent to the [`-atlas-join` command-line flag](#atlas_join). + +* `atlas_token` - Equivalent to the [`-atlas-token` command-line flag](#atlas_token). + * `bootstrap` - Equivalent to the [`-bootstrap` command-line flag](#bootstrap_anchor). * `bootstrap_expect` - Equivalent to the [`-bootstrap-expect` command-line flag](#bootstrap_expect). diff --git a/website/source/docs/faq.html.markdown b/website/source/docs/faq.html.markdown index 56fdd67f5..2905375d4 100644 --- a/website/source/docs/faq.html.markdown +++ b/website/source/docs/faq.html.markdown @@ -28,4 +28,14 @@ and can be disabled. See [`disable_anonymous_signature`](/docs/agent/options.html#disable_anonymous_signature) and [`disable_update_check`](/docs/agent/options.html#disable_update_check). +## Q: How does Atlas integration work? + +Consul makes use of a HashiCorp service called [SCADA](http://scada.hashicorp.com) +which stands for Supervisory Control And Data Acquisition. The SCADA system allows +clients to maintain a long-running connection to Atlas which is used to make requests +to Consul agents for features like the dashboard and auto joining. Standard ACLs can +be applied to the SCADA connection, which has no enhanced or elevated privileges. +Using the SCADA service is optional and only enabled by opt-in. + +See the [Atlas integration guide](/docs/guides/atlas.html). diff --git a/website/source/docs/guides/atlas.html.markdown b/website/source/docs/guides/atlas.html.markdown new file mode 100644 index 000000000..86e76cac8 --- /dev/null +++ b/website/source/docs/guides/atlas.html.markdown @@ -0,0 +1,59 @@ +--- +layout: "docs" +page_title: "Atlas Integration" +sidebar_current: "docs-guides-atlas" +description: |- + This guide covers how to integrate Atlas with Consul to provide features like an infrastructure dashboard and automatic cluster joining. +--- + +# Atlas Integration + +[Atlas](https://atlas.hashicorp.com) is service provided by HashiCorp to deploy applications and manage infrastructure. +Starting with Consul 0.5, it is possible to integrate Consul with Atlas. This is done by registering a node as part +of an Atlas infrastructure (specified with the `-atlas` flag). Consul maintains a long running connection to the +[SCADA](http://scada.hashicorp.com) service which allows Atlas to retrieve data and control nodes. + +Data acquisition allows Atlas to display the state of the Consul cluster in its dashboard as well as enabling +alerts to be setup using health checks. Remote control enables Atlas to provide features like the auto joinining +nodes. + +## Enabling Atlas Integration + +To enable Atlas integration, you must specify the name of the Atlas infrastructure and the Atlas authentication +token. The Atlas infrastructure name can be set either with the `-atlas` CLI flag, or with the `atlas_infrastructure` +[configuration option](/docs/agent/options.html). The Atlas token is set with the `-atlas-token` CLI flag, `atlas_token` +configuration option, or `ATLAS_TOKEN` environment variable. + +To verify the integration, either run the agent with `debug` level logging or use `consul monitor -log-level=debug` +and look for a line like: + + [DEBUG] scada-client: assigned session '406ca55d-1801-f964-2942-45f5f9df3995' + +This shows that the Consul agent was successfully able to register with the SCADA service. + +## Using Auto-Join + +Once integrated with Atlas, the auto join feature can be used to have nodes automatically join other +peers in their datacenter. Server nodes will automatically join peer LAN nodes and other WAN nodes. +Client nodes will only join other LAN nodes in their datacenter. + +Auto join is enabled with the `-atlas-join` CLI flag or the `atlas_join` configuration option. + +## Securing Atlas + +The connection to Atlas does not have elevated privileges. API requests made by Atlas +are served in the same way any other HTTP request is made. If ACLs are enabled, it is possible to +force an Atlas ACL token to be used instead of the agent's default token. + +When ACLs are enabled, the `atlas_acl_token` configuration option can be specified. This changes +the ACL token resolution order to be: + +1. Request specific token provided by `?token=`. These tokens are set in the Atlas UI. +2. The `atlas_acl_token` if configured. +3. The `acl_token` if configured. +4. The `anonymous` token. + +Because the `acl_token` typically has elevated permissions compared to the `anonymous` token, +the `atlas_acl_token` can be set to `anonymous` to drop privileges that would otherwise be +inherited from the agent. + diff --git a/website/source/docs/guides/index.html.markdown b/website/source/docs/guides/index.html.markdown index 6e618b048..c1a8f946b 100644 --- a/website/source/docs/guides/index.html.markdown +++ b/website/source/docs/guides/index.html.markdown @@ -14,6 +14,8 @@ guidance to do them safely. The following guides are available: +* [Atlas Integration](/docs/guides/atlas.html) - This guide covers how to integrate [Atlas](https://atlas.hashicorp.com) with Consul. + * [Adding/Removing Servers](/docs/guides/servers.html) - This guide covers how to safely add and remove Consul servers from the cluster. This should be done carefully to avoid availability outages. * [Bootstrapping](/docs/guides/bootstrapping.html) - This guide covers bootstrapping a new datacenter. This covers safely adding the initial Consul servers. diff --git a/website/source/layouts/docs.erb b/website/source/layouts/docs.erb index 7b99f15fa..1da41cd05 100644 --- a/website/source/layouts/docs.erb +++ b/website/source/layouts/docs.erb @@ -196,7 +196,11 @@ > Guides -