open-consul/agent/agent_endpoint.go

1682 lines
52 KiB
Go
Raw Normal View History

2014-01-04 01:15:51 +00:00
package agent
import (
"encoding/json"
"fmt"
2014-01-04 01:15:51 +00:00
"net/http"
"strconv"
2014-01-04 01:15:51 +00:00
"strings"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/mitchellh/hashstructure"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/hashicorp/consul/acl"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/consul"
New command: consul debug (#4754) * agent/debug: add package for debugging, host info * api: add v1/agent/host endpoint * agent: add v1/agent/host endpoint * command/debug: implementation of static capture * command/debug: tests and only configured targets * agent/debug: add basic test for host metrics * command/debug: add methods for dynamic data capture * api: add debug/pprof endpoints * command/debug: add pprof * command/debug: timing, wg, logs to disk * vendor: add gopsutil/disk * command/debug: add a usage section * website: add docs for consul debug * agent/host: require operator:read * api/host: improve docs and no retry timing * command/debug: fail on extra arguments * command/debug: fixup file permissions to 0644 * command/debug: remove server flags * command/debug: improve clarity of usage section * api/debug: add Trace for profiling, fix profile * command/debug: capture profile and trace at the same time * command/debug: add index document * command/debug: use "clusters" in place of members * command/debug: remove address in output * command/debug: improve comment on metrics sleep * command/debug: clarify usage * agent: always register pprof handlers and protect This will allow us to avoid a restart of a target agent for profiling by always registering the pprof handlers. Given this is a potentially sensitive path, it is protected with an operator:read ACL and enable debug being set to true on the target agent. enable_debug still requires a restart. If ACLs are disabled, enable_debug is sufficient. * command/debug: use trace.out instead of .prof More in line with golang docs. * agent: fix comment wording * agent: wrap table driven tests in t.run()
2018-10-17 20:20:35 +00:00
"github.com/hashicorp/consul/agent/debug"
"github.com/hashicorp/consul/agent/structs"
ACL Token Persistence and Reloading (#5328) This PR adds two features which will be useful for operators when ACLs are in use. 1. Tokens set in configuration files are now reloadable. 2. If `acl.enable_token_persistence` is set to `true` in the configuration, tokens set via the `v1/agent/token` endpoint are now persisted to disk and loaded when the agent starts (or during configuration reload) Note that token persistence is opt-in so our users who do not want tokens on the local disk will see no change. Some other secondary changes: * Refactored a bunch of places where the replication token is retrieved from the token store. This token isn't just for replicating ACLs and now it is named accordingly. * Allowed better paths in the `v1/agent/token/` API. Instead of paths like: `v1/agent/token/acl_replication_token` the path can now be just `v1/agent/token/replication`. The old paths remain to be valid. * Added a couple new API functions to set tokens via the new paths. Deprecated the old ones and pointed to the new names. The names are also generally better and don't imply that what you are setting is for ACLs but rather are setting ACL tokens. There is a minor semantic difference there especially for the replication token as again, its no longer used only for ACL token/policy replication. The new functions will detect 404s and fallback to using the older token paths when talking to pre-1.4.3 agents. * Docs updated to reflect the API additions and to show using the new endpoints. * Updated the ACL CLI set-agent-tokens command to use the non-deprecated APIs.
2019-02-27 19:28:31 +00:00
token_store "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/agent/xds/proxysupport"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/logging/monitor"
"github.com/hashicorp/consul/types"
2014-01-04 01:15:51 +00:00
)
2017-04-21 00:46:29 +00:00
type Self struct {
Config interface{}
DebugConfig map[string]interface{}
Coord *coordinate.Coordinate
Member serf.Member
Stats map[string]map[string]string
Meta map[string]string
XDS *XDSSelf `json:"xDS,omitempty"`
}
type XDSSelf struct {
SupportedProxies map[string][]string
// Port could be used for either TLS or plain-text communication
// up through version 1.14. In order to maintain backwards-compatibility,
// Port will now default to TLS and fallback to the standard port value.
2022-09-01 17:32:11 +00:00
// DEPRECATED: Use Ports field instead
Port int
Ports GRPCPorts
}
// GRPCPorts is used to hold the external GRPC server's port numbers.
type GRPCPorts struct {
// Technically, this port is not always plain-text as of 1.14, but will be in a future release.
Plaintext int
TLS int
}
func (s *HTTPHandlers) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil)
if err != nil {
return nil, err
}
// Authorize using the agent's own enterprise meta, not the token.
var authzContext acl.AuthorizerContext
s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext)
if err := authz.ToAllowAuthorizer().AgentReadAllowed(s.agent.config.NodeName, &authzContext); err != nil {
return nil, err
}
var cs lib.CoordinateSet
if !s.agent.config.DisableCoordinates {
var err error
if cs, err = s.agent.GetLANCoordinate(); err != nil {
return nil, err
}
}
var xds *XDSSelf
if s.agent.xdsServer != nil {
xds = &XDSSelf{
SupportedProxies: map[string][]string{
"envoy": proxysupport.EnvoyVersions,
},
// Prefer the TLS port. See comment on the XDSSelf struct for details.
2022-09-01 17:32:11 +00:00
Port: s.agent.config.GRPCTLSPort,
Ports: GRPCPorts{
Plaintext: s.agent.config.GRPCPort,
TLS: s.agent.config.GRPCTLSPort,
},
}
// Fallback to standard port if TLS is not enabled.
2022-09-01 17:32:11 +00:00
if s.agent.config.GRPCTLSPort <= 0 {
xds.Port = s.agent.config.GRPCPort
}
}
config := struct {
Datacenter string
PrimaryDatacenter string
NodeName string
NodeID string
Partition string `json:",omitempty"`
Revision string
Server bool
Version string
BuildDate string
}{
Datacenter: s.agent.config.Datacenter,
PrimaryDatacenter: s.agent.config.PrimaryDatacenter,
NodeName: s.agent.config.NodeName,
NodeID: string(s.agent.config.NodeID),
Partition: s.agent.config.PartitionOrEmpty(),
Revision: s.agent.config.Revision,
Server: s.agent.config.ServerMode,
// We expect the ent version to be part of the reported version string, and that's now part of the metadata, not the actual version.
Version: s.agent.config.VersionWithMetadata(),
BuildDate: s.agent.config.BuildDate.Format(time.RFC3339),
}
2017-04-21 00:46:29 +00:00
return Self{
Config: config,
DebugConfig: s.agent.config.Sanitized(),
Coord: cs[s.agent.config.SegmentName],
Member: s.agent.AgentLocalMember(),
Stats: s.agent.Stats(),
2017-08-28 12:17:13 +00:00
Meta: s.agent.State.Metadata(),
XDS: xds,
}, nil
}
// acceptsOpenMetricsMimeType returns true if mime type is Prometheus-compatible
func acceptsOpenMetricsMimeType(acceptHeader string) bool {
mimeTypes := strings.Split(acceptHeader, ",")
for _, v := range mimeTypes {
mimeInfo := strings.Split(v, ";")
if len(mimeInfo) > 0 {
rawMime := strings.ToLower(strings.Trim(mimeInfo[0], " "))
if rawMime == "application/openmetrics-text" {
return true
}
if rawMime == "text/plain" && (len(mimeInfo) > 1 && strings.Trim(mimeInfo[1], " ") == "version=0.4.0") {
return true
}
}
}
return false
}
// enablePrometheusOutput will look for Prometheus mime-type or format Query parameter the same way as Nomad
func enablePrometheusOutput(req *http.Request) bool {
if format := req.URL.Query().Get("format"); format == "prometheus" {
return true
}
return acceptsOpenMetricsMimeType(req.Header.Get("Accept"))
}
func (s *HTTPHandlers) AgentMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
2017-08-08 20:05:38 +00:00
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil)
2017-08-08 20:05:38 +00:00
if err != nil {
return nil, err
}
// Authorize using the agent's own enterprise meta, not the token.
var authzContext acl.AuthorizerContext
s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext)
if err := authz.ToAllowAuthorizer().AgentReadAllowed(s.agent.config.NodeName, &authzContext); err != nil {
return nil, err
2017-08-08 20:05:38 +00:00
}
if enablePrometheusOutput(req) {
if s.agent.config.Telemetry.PrometheusOpts.Expiration < 1 {
return nil, CodeWithPayloadError{
StatusCode: http.StatusUnsupportedMediaType,
Reason: "Prometheus is not enabled since its retention time is not positive",
ContentType: "text/plain",
}
}
handlerOptions := promhttp.HandlerOpts{
ErrorLog: s.agent.logger.StandardLogger(&hclog.StandardLoggerOptions{
InferLevels: true,
}),
ErrorHandling: promhttp.ContinueOnError,
}
2017-08-08 20:05:38 +00:00
handler := promhttp.HandlerFor(prometheus.DefaultGatherer, handlerOptions)
handler.ServeHTTP(resp, req)
return nil, nil
}
return s.agent.baseDeps.MetricsConfig.Handler.DisplayMetrics(resp, req)
2017-08-08 20:05:38 +00:00
}
func (s *HTTPHandlers) AgentMetricsStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil)
if err != nil {
return nil, err
}
// Authorize using the agent's own enterprise meta, not the token.
var authzContext acl.AuthorizerContext
s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext)
if err := authz.ToAllowAuthorizer().AgentReadAllowed(s.agent.config.NodeName, &authzContext); err != nil {
return nil, err
}
flusher, ok := resp.(http.Flusher)
if !ok {
return nil, fmt.Errorf("streaming not supported")
}
resp.WriteHeader(http.StatusOK)
// 0 byte write is needed before the Flush call so that if we are using
// a gzip stream it will go ahead and write out the HTTP response header
resp.Write([]byte(""))
flusher.Flush()
enc := metricsEncoder{
logger: s.agent.logger,
encoder: json.NewEncoder(resp),
flusher: flusher,
}
enc.encoder.SetIndent("", " ")
s.agent.baseDeps.MetricsConfig.Handler.Stream(req.Context(), enc)
return nil, nil
}
type metricsEncoder struct {
logger hclog.Logger
encoder *json.Encoder
flusher http.Flusher
}
func (m metricsEncoder) Encode(summary interface{}) error {
if err := m.encoder.Encode(summary); err != nil {
m.logger.Error("failed to encode metrics summary", "error", err)
return err
}
m.flusher.Flush()
return nil
}
func (s *HTTPHandlers) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil)
if err != nil {
return nil, err
}
// Authorize using the agent's own enterprise meta, not the token.
var authzContext acl.AuthorizerContext
s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext)
if err := authz.ToAllowAuthorizer().AgentWriteAllowed(s.agent.config.NodeName, &authzContext); err != nil {
return nil, err
}
return nil, s.agent.ReloadConfig()
}
func buildAgentService(s *structs.NodeService, dc string) api.AgentService {
weights := api.AgentWeights{Passing: 1, Warning: 1}
if s.Weights != nil {
if s.Weights.Passing > 0 {
weights.Passing = s.Weights.Passing
}
weights.Warning = s.Weights.Warning
}
var taggedAddrs map[string]api.ServiceAddress
if len(s.TaggedAddresses) > 0 {
taggedAddrs = make(map[string]api.ServiceAddress)
for k, v := range s.TaggedAddresses {
taggedAddrs[k] = v.ToAPIServiceAddress()
}
}
as := api.AgentService{
Kind: api.ServiceKind(s.Kind),
ID: s.ID,
Service: s.Service,
Tags: s.Tags,
Meta: s.Meta,
Port: s.Port,
Address: s.Address,
SocketPath: s.SocketPath,
TaggedAddresses: taggedAddrs,
EnableTagOverride: s.EnableTagOverride,
CreateIndex: s.CreateIndex,
ModifyIndex: s.ModifyIndex,
Weights: weights,
Datacenter: dc,
}
if as.Tags == nil {
as.Tags = []string{}
}
if as.Meta == nil {
as.Meta = map[string]string{}
}
// Attach Proxy config if exists
if s.Kind == structs.ServiceKindConnectProxy || s.IsGateway() {
as.Proxy = s.Proxy.ToAPI()
}
// Attach Connect configs if they exist.
if s.Connect.Native {
as.Connect = &api.AgentServiceConnect{
Native: true,
}
}
fillAgentServiceEnterpriseMeta(&as, &s.EnterpriseMeta)
return as
}
func (s *HTTPHandlers) AgentServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any.
var token string
s.parseToken(req, &token)
var entMeta acl.EnterpriseMeta
if err := s.parseEntMetaNoWildcard(req, &entMeta); err != nil {
return nil, err
}
var filterExpression string
s.parseFilter(req, &filterExpression)
s.defaultMetaPartitionToAgent(&entMeta)
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &entMeta, nil)
if err != nil {
return nil, err
}
if !s.validateRequestPartition(resp, &entMeta) {
return nil, nil
}
// NOTE: we're explicitly fetching things in the requested partition and
// namespace here.
services := s.agent.State.Services(&entMeta)
2017-04-28 01:22:07 +00:00
// Convert into api.AgentService since that includes Connect config but so far
// NodeService doesn't need to internally. They are otherwise identical since
// that is the struct used in client for reading the one we output here
// anyway.
agentSvcs := make(map[string]*api.AgentService)
for id, svc := range services {
agentService := buildAgentService(svc, s.agent.config.Datacenter)
agentSvcs[id.ID] = &agentService
2017-04-28 01:22:07 +00:00
}
filter, err := bexpr.CreateFilter(filterExpression, nil, agentSvcs)
if err != nil {
return nil, err
}
raw, err := filter.Execute(agentSvcs)
if err != nil {
return nil, err
}
agentSvcs = raw.(map[string]*api.AgentService)
// Note: we filter the results with ACLs *after* applying the user-supplied
// bexpr filter, to ensure total (and the filter-by-acls header we set below)
// do not include results that would be filtered out even if the user did have
// permission.
total := len(agentSvcs)
if err := s.agent.filterServicesWithAuthorizer(authz, agentSvcs); err != nil {
return nil, err
}
// Set the X-Consul-Results-Filtered-By-ACLs header, but only if the user is
// authenticated (to prevent information leaking).
//
// This is done automatically for HTTP endpoints that proxy to an RPC endpoint
// that sets QueryMeta.ResultsFilteredByACLs, but must be done manually for
// agent-local endpoints.
//
// For more information see the comment on: Server.maskResultsFilteredByACLs.
if token != "" {
setResultsFilteredByACLs(resp, total != len(agentSvcs))
}
return agentSvcs, nil
}
// GET /v1/agent/service/:service_id
//
// Returns the service definition for a single local services and allows
// blocking watch using hash-based blocking.
func (s *HTTPHandlers) AgentService(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Get the proxy ID. Note that this is the ID of a proxy's service instance.
2022-06-01 17:17:14 +00:00
id := strings.TrimPrefix(req.URL.Path, "/v1/agent/service/")
// Maybe block
var queryOpts structs.QueryOptions
if parseWait(resp, req, &queryOpts) {
// parseWait returns an error itself
return nil, nil
}
// Parse the token
var token string
s.parseToken(req, &token)
var entMeta acl.EnterpriseMeta
if err := s.parseEntMetaNoWildcard(req, &entMeta); err != nil {
return nil, err
}
// need to resolve to default the meta
s.defaultMetaPartitionToAgent(&entMeta)
2022-06-01 17:17:14 +00:00
_, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &entMeta, nil)
if err != nil {
return nil, err
}
// Parse hash specially. Eventually this should happen in parseWait and end up
// in QueryOptions but I didn't want to make very general changes right away.
hash := req.URL.Query().Get("hash")
sid := structs.NewServiceID(id, &entMeta)
if !s.validateRequestPartition(resp, &entMeta) {
return nil, nil
}
dc := s.agent.config.Datacenter
resultHash, service, err := s.agent.LocalBlockingQuery(false, hash, queryOpts.MaxQueryTime,
func(ws memdb.WatchSet) (string, interface{}, error) {
svcState := s.agent.State.ServiceState(sid)
if svcState == nil {
return "", nil, HTTPError{StatusCode: http.StatusNotFound, Reason: fmt.Sprintf("unknown service ID: %s", sid.String())}
}
svc := svcState.Service
// Setup watch on the service
ws.Add(svcState.WatchCh)
// Check ACLs.
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil)
if err != nil {
return "", nil, err
}
var authzContext acl.AuthorizerContext
svc.FillAuthzContext(&authzContext)
if err := authz.ToAllowAuthorizer().ServiceReadAllowed(svc.Service, &authzContext); err != nil {
return "", nil, err
}
// Calculate the content hash over the response, minus the hash field
aSvc := buildAgentService(svc, dc)
reply := &aSvc
// TODO(partitions): do we need to do anything here?
rawHash, err := hashstructure.Hash(reply, nil)
if err != nil {
return "", nil, err
}
// Include the ContentHash in the response body
reply.ContentHash = fmt.Sprintf("%x", rawHash)
return reply.ContentHash, reply, nil
},
)
if resultHash != "" {
resp.Header().Set("X-Consul-ContentHash", resultHash)
}
return service, err
}
func (s *HTTPHandlers) AgentChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any.
var token string
s.parseToken(req, &token)
var entMeta acl.EnterpriseMeta
if err := s.parseEntMetaNoWildcard(req, &entMeta); err != nil {
return nil, err
}
s.defaultMetaPartitionToAgent(&entMeta)
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &entMeta, nil)
if err != nil {
return nil, err
}
if !s.validateRequestPartition(resp, &entMeta) {
return nil, nil
}
var filterExpression string
s.parseFilter(req, &filterExpression)
filter, err := bexpr.CreateFilter(filterExpression, nil, nil)
if err != nil {
return nil, err
}
// NOTE(partitions): this works because nodes exist in ONE partition
checks := s.agent.State.Checks(&entMeta)
2017-04-28 01:22:07 +00:00
agentChecks := make(map[types.CheckID]*structs.HealthCheck)
for id, c := range checks {
2017-04-28 01:22:07 +00:00
if c.ServiceTags == nil {
clone := *c
clone.ServiceTags = make([]string, 0)
agentChecks[id.ID] = &clone
} else {
agentChecks[id.ID] = c
2017-04-28 01:22:07 +00:00
}
}
raw, err := filter.Execute(agentChecks)
if err != nil {
return nil, err
}
agentChecks = raw.(map[types.CheckID]*structs.HealthCheck)
// Note: we filter the results with ACLs *after* applying the user-supplied
// bexpr filter, to ensure total (and the filter-by-acls header we set below)
// do not include results that would be filtered out even if the user did have
// permission.
total := len(agentChecks)
if err := s.agent.filterChecksWithAuthorizer(authz, agentChecks); err != nil {
return nil, err
}
// Set the X-Consul-Results-Filtered-By-ACLs header, but only if the user is
// authenticated (to prevent information leaking).
//
// This is done automatically for HTTP endpoints that proxy to an RPC endpoint
// that sets QueryMeta.ResultsFilteredByACLs, but must be done manually for
// agent-local endpoints.
//
// For more information see the comment on: Server.maskResultsFilteredByACLs.
if token != "" {
setResultsFilteredByACLs(resp, total != len(agentChecks))
}
return agentChecks, nil
2014-01-04 01:15:51 +00:00
}
func (s *HTTPHandlers) AgentMembers(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any.
var token string
s.parseToken(req, &token)
2014-01-04 01:15:51 +00:00
// Check if the WAN is being queried
wan := false
if other := req.URL.Query().Get("wan"); other != "" {
wan = true
}
segment := req.URL.Query().Get("segment")
2017-09-05 20:40:19 +00:00
if wan {
switch segment {
case "", api.AllSegments:
// The zero value and the special "give me all members"
// key are ok, otherwise the argument doesn't apply to
// the WAN.
default:
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Cannot provide a segment with wan=true"}
2017-09-05 20:40:19 +00:00
}
}
// Get the request partition and default to that of the agent.
entMeta := s.agent.AgentEnterpriseMeta()
if err := s.parseEntMetaPartition(req, entMeta); err != nil {
return nil, err
}
var members []serf.Member
2014-01-04 01:15:51 +00:00
if wan {
members = s.agent.WANMembers()
} else {
filter := consul.LANMemberFilter{
Partition: entMeta.PartitionOrDefault(),
}
if segment == api.AllSegments {
// Older 'consul members' calls will default to adding segment=_all
// so we only choose to use that request argument in the case where
// the partition is also the default and ignore it the rest of the time.
if acl.IsDefaultPartition(filter.Partition) {
filter.AllSegments = true
}
} else {
filter.Segment = segment
}
var err error
members, err = s.agent.delegate.LANMembers(filter)
if err != nil {
return nil, err
}
}
total := len(members)
if err := s.agent.filterMembers(token, &members); err != nil {
return nil, err
2014-01-04 01:15:51 +00:00
}
// Set the X-Consul-Results-Filtered-By-ACLs header, but only if the user is
// authenticated (to prevent information leaking).
//
// This is done automatically for HTTP endpoints that proxy to an RPC endpoint
// that sets QueryMeta.ResultsFilteredByACLs, but must be done manually for
// agent-local endpoints.
//
// For more information see the comment on: Server.maskResultsFilteredByACLs.
if token != "" {
setResultsFilteredByACLs(resp, total != len(members))
}
return members, nil
2014-01-04 01:15:51 +00:00
}
func (s *HTTPHandlers) AgentJoin(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil)
if err != nil {
return nil, err
}
// Authorize using the agent's own enterprise meta, not the token.
var authzContext acl.AuthorizerContext
s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext)
if err := authz.ToAllowAuthorizer().AgentWriteAllowed(s.agent.config.NodeName, &authzContext); err != nil {
return nil, err
}
// Get the request partition and default to that of the agent.
entMeta := s.agent.AgentEnterpriseMeta()
if err := s.parseEntMetaPartition(req, entMeta); err != nil {
return nil, err
}
2014-01-04 01:15:51 +00:00
// Check if the WAN is being queried
wan := false
if other := req.URL.Query().Get("wan"); other != "" {
wan = true
}
// Get the address
2022-06-01 17:17:14 +00:00
addr := strings.TrimPrefix(req.URL.Path, "/v1/agent/join/")
wan federation via mesh gateways (#6884) This is like a Möbius strip of code due to the fact that low-level components (serf/memberlist) are connected to high-level components (the catalog and mesh-gateways) in a twisty maze of references which make it hard to dive into. With that in mind here's a high level summary of what you'll find in the patch: There are several distinct chunks of code that are affected: * new flags and config options for the server * retry join WAN is slightly different * retry join code is shared to discover primary mesh gateways from secondary datacenters * because retry join logic runs in the *agent* and the results of that operation for primary mesh gateways are needed in the *server* there are some methods like `RefreshPrimaryGatewayFallbackAddresses` that must occur at multiple layers of abstraction just to pass the data down to the right layer. * new cache type `FederationStateListMeshGatewaysName` for use in `proxycfg/xds` layers * the function signature for RPC dialing picked up a new required field (the node name of the destination) * several new RPCs for manipulating a FederationState object: `FederationState:{Apply,Get,List,ListMeshGateways}` * 3 read-only internal APIs for debugging use to invoke those RPCs from curl * raft and fsm changes to persist these FederationStates * replication for FederationStates as they are canonically stored in the Primary and replicated to the Secondaries. * a special derivative of anti-entropy that runs in secondaries to snapshot their local mesh gateway `CheckServiceNodes` and sync them into their upstream FederationState in the primary (this works in conjunction with the replication to distribute addresses for all mesh gateways in all DCs to all other DCs) * a "gateway locator" convenience object to make use of this data to choose the addresses of gateways to use for any given RPC or gossip operation to a remote DC. This gets data from the "retry join" logic in the agent and also directly calls into the FSM. * RPC (`:8300`) on the server sniffs the first byte of a new connection to determine if it's actually doing native TLS. If so it checks the ALPN header for protocol determination (just like how the existing system uses the type-byte marker). * 2 new kinds of protocols are exclusively decoded via this native TLS mechanism: one for ferrying "packet" operations (udp-like) from the gossip layer and one for "stream" operations (tcp-like). The packet operations re-use sockets (using length-prefixing) to cut down on TLS re-negotiation overhead. * the server instances specially wrap the `memberlist.NetTransport` when running with gateway federation enabled (in a `wanfed.Transport`). The general gist is that if it tries to dial a node in the SAME datacenter (deduced by looking at the suffix of the node name) there is no change. If dialing a DIFFERENT datacenter it is wrapped up in a TLS+ALPN blob and sent through some mesh gateways to eventually end up in a server's :8300 port. * a new flag when launching a mesh gateway via `consul connect envoy` to indicate that the servers are to be exposed. This sets a special service meta when registering the gateway into the catalog. * `proxycfg/xds` notice this metadata blob to activate additional watches for the FederationState objects as well as the location of all of the consul servers in that datacenter. * `xds:` if the extra metadata is in place additional clusters are defined in a DC to bulk sink all traffic to another DC's gateways. For the current datacenter we listen on a wildcard name (`server.<dc>.consul`) that load balances all servers as well as one mini-cluster per node (`<node>.server.<dc>.consul`) * the `consul tls cert create` command got a new flag (`-node`) to help create an additional SAN in certs that can be used with this flavor of federation.
2020-03-09 20:59:02 +00:00
2014-01-04 01:15:51 +00:00
if wan {
wan federation via mesh gateways (#6884) This is like a Möbius strip of code due to the fact that low-level components (serf/memberlist) are connected to high-level components (the catalog and mesh-gateways) in a twisty maze of references which make it hard to dive into. With that in mind here's a high level summary of what you'll find in the patch: There are several distinct chunks of code that are affected: * new flags and config options for the server * retry join WAN is slightly different * retry join code is shared to discover primary mesh gateways from secondary datacenters * because retry join logic runs in the *agent* and the results of that operation for primary mesh gateways are needed in the *server* there are some methods like `RefreshPrimaryGatewayFallbackAddresses` that must occur at multiple layers of abstraction just to pass the data down to the right layer. * new cache type `FederationStateListMeshGatewaysName` for use in `proxycfg/xds` layers * the function signature for RPC dialing picked up a new required field (the node name of the destination) * several new RPCs for manipulating a FederationState object: `FederationState:{Apply,Get,List,ListMeshGateways}` * 3 read-only internal APIs for debugging use to invoke those RPCs from curl * raft and fsm changes to persist these FederationStates * replication for FederationStates as they are canonically stored in the Primary and replicated to the Secondaries. * a special derivative of anti-entropy that runs in secondaries to snapshot their local mesh gateway `CheckServiceNodes` and sync them into their upstream FederationState in the primary (this works in conjunction with the replication to distribute addresses for all mesh gateways in all DCs to all other DCs) * a "gateway locator" convenience object to make use of this data to choose the addresses of gateways to use for any given RPC or gossip operation to a remote DC. This gets data from the "retry join" logic in the agent and also directly calls into the FSM. * RPC (`:8300`) on the server sniffs the first byte of a new connection to determine if it's actually doing native TLS. If so it checks the ALPN header for protocol determination (just like how the existing system uses the type-byte marker). * 2 new kinds of protocols are exclusively decoded via this native TLS mechanism: one for ferrying "packet" operations (udp-like) from the gossip layer and one for "stream" operations (tcp-like). The packet operations re-use sockets (using length-prefixing) to cut down on TLS re-negotiation overhead. * the server instances specially wrap the `memberlist.NetTransport` when running with gateway federation enabled (in a `wanfed.Transport`). The general gist is that if it tries to dial a node in the SAME datacenter (deduced by looking at the suffix of the node name) there is no change. If dialing a DIFFERENT datacenter it is wrapped up in a TLS+ALPN blob and sent through some mesh gateways to eventually end up in a server's :8300 port. * a new flag when launching a mesh gateway via `consul connect envoy` to indicate that the servers are to be exposed. This sets a special service meta when registering the gateway into the catalog. * `proxycfg/xds` notice this metadata blob to activate additional watches for the FederationState objects as well as the location of all of the consul servers in that datacenter. * `xds:` if the extra metadata is in place additional clusters are defined in a DC to bulk sink all traffic to another DC's gateways. For the current datacenter we listen on a wildcard name (`server.<dc>.consul`) that load balances all servers as well as one mini-cluster per node (`<node>.server.<dc>.consul`) * the `consul tls cert create` command got a new flag (`-node`) to help create an additional SAN in certs that can be used with this flavor of federation.
2020-03-09 20:59:02 +00:00
if s.agent.config.ConnectMeshGatewayWANFederationEnabled {
return nil, fmt.Errorf("WAN join is disabled when wan federation via mesh gateways is enabled")
}
_, err = s.agent.JoinWAN([]string{addr})
2014-01-04 01:15:51 +00:00
} else {
_, err = s.agent.JoinLAN([]string{addr}, entMeta)
2014-01-04 01:15:51 +00:00
}
return nil, err
2014-01-04 01:15:51 +00:00
}
func (s *HTTPHandlers) AgentLeave(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil)
if err != nil {
return nil, err
}
// Authorize using the agent's own enterprise meta, not the token.
var authzContext acl.AuthorizerContext
s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext)
if err := authz.ToAllowAuthorizer().AgentWriteAllowed(s.agent.config.NodeName, &authzContext); err != nil {
return nil, err
}
if err := s.agent.Leave(); err != nil {
return nil, err
}
return nil, s.agent.ShutdownAgent()
}
func (s *HTTPHandlers) AgentForceLeave(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil)
if err != nil {
return nil, err
}
2021-08-20 22:11:01 +00:00
// TODO(partitions): should this be possible in a partition?
if err := authz.ToAllowAuthorizer().OperatorWriteAllowed(nil); err != nil {
return nil, err
}
// Get the request partition and default to that of the agent.
entMeta := s.agent.AgentEnterpriseMeta()
if err := s.parseEntMetaPartition(req, entMeta); err != nil {
return nil, err
}
// Check the value of the prune query
_, prune := req.URL.Query()["prune"]
// Check if the WAN is being queried
_, wan := req.URL.Query()["wan"]
2022-06-01 17:17:14 +00:00
addr := strings.TrimPrefix(req.URL.Path, "/v1/agent/force-leave/")
if wan {
return nil, s.agent.ForceLeaveWAN(addr, prune, entMeta)
} else {
return nil, s.agent.ForceLeave(addr, prune, entMeta)
}
2014-01-04 01:15:51 +00:00
}
// syncChanges is a helper function which wraps a blocking call to sync
// services and checks to the server. If the operation fails, we only
// only warn because the write did succeed and anti-entropy will sync later.
func (s *HTTPHandlers) syncChanges() {
2017-08-28 12:17:13 +00:00
if err := s.agent.State.SyncChanges(); err != nil {
s.agent.logger.Error("failed to sync changes", "error", err)
}
}
func (s *HTTPHandlers) AgentRegisterCheck(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var token string
s.parseToken(req, &token)
var args structs.CheckDefinition
if err := s.parseEntMetaNoWildcard(req, &args.EnterpriseMeta); err != nil {
return nil, err
}
if err := decodeBody(req.Body, &args); err != nil {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Request decode failed: %v", err)}
}
// Verify the check has a name.
if args.Name == "" {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing check name"}
}
if args.Status != "" && !structs.ValidStatus(args.Status) {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Bad check status"}
}
s.defaultMetaPartitionToAgent(&args.EnterpriseMeta)
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &args.EnterpriseMeta, nil)
if err != nil {
return nil, err
}
if !s.validateRequestPartition(resp, &args.EnterpriseMeta) {
return nil, nil
}
// Construct the health check.
health := args.HealthCheck(s.agent.config.NodeName)
// Verify the check type.
chkType := args.CheckType()
err = chkType.Validate()
if err != nil {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Invalid check: %v", err)}
}
2019-10-17 18:33:11 +00:00
// Store the type of check based on the definition
health.Type = chkType.Type()
if health.ServiceID != "" {
// fixup the service name so that vetCheckRegister requires the right ACLs
cid := health.CompoundServiceID()
2021-11-19 16:50:44 +00:00
service := s.agent.State.Service(cid)
if service != nil {
health.ServiceName = service.Service
2021-11-19 16:50:44 +00:00
} else {
return nil, HTTPError{StatusCode: http.StatusNotFound, Reason: fmt.Sprintf("ServiceID %q does not exist", cid.String())}
}
}
// Get the provided token, if any, and vet against any ACL policies.
if err := s.agent.vetCheckRegisterWithAuthorizer(authz, health); err != nil {
return nil, err
}
// Add the check.
if err := s.agent.AddCheck(health, chkType, true, token, ConfigSourceRemote); err != nil {
return nil, err
}
s.syncChanges()
return nil, nil
}
func (s *HTTPHandlers) AgentDeregisterCheck(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
2022-06-01 17:17:14 +00:00
id := strings.TrimPrefix(req.URL.Path, "/v1/agent/check/deregister/")
entMeta := acl.NewEnterpriseMetaWithPartition(s.agent.config.PartitionOrDefault(), "")
checkID := structs.NewCheckID(types.CheckID(id), &entMeta)
// Get the provided token, if any, and vet against any ACL policies.
var token string
s.parseToken(req, &token)
if err := s.parseEntMetaNoWildcard(req, &checkID.EnterpriseMeta); err != nil {
return nil, err
}
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &checkID.EnterpriseMeta, nil)
if err != nil {
return nil, err
}
checkID.Normalize()
if !s.validateRequestPartition(resp, &checkID.EnterpriseMeta) {
return nil, nil
}
2021-11-19 16:50:44 +00:00
if err := s.agent.vetCheckUpdateWithAuthorizer(authz, checkID); err != nil {
return nil, err
}
if err := s.agent.RemoveCheck(checkID, true); err != nil {
return nil, err
}
s.syncChanges()
return nil, nil
}
func (s *HTTPHandlers) AgentCheckPass(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
2022-06-01 17:17:14 +00:00
id := strings.TrimPrefix(req.URL.Path, "/v1/agent/check/pass/")
checkID := types.CheckID(id)
note := req.URL.Query().Get("note")
return s.agentCheckUpdate(resp, req, checkID, api.HealthPassing, note)
}
func (s *HTTPHandlers) AgentCheckWarn(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
2022-06-01 17:17:14 +00:00
id := strings.TrimPrefix(req.URL.Path, "/v1/agent/check/warn/")
checkID := types.CheckID(id)
note := req.URL.Query().Get("note")
return s.agentCheckUpdate(resp, req, checkID, api.HealthWarning, note)
}
func (s *HTTPHandlers) AgentCheckFail(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
2022-06-01 17:17:14 +00:00
id := strings.TrimPrefix(req.URL.Path, "/v1/agent/check/fail/")
checkID := types.CheckID(id)
note := req.URL.Query().Get("note")
return s.agentCheckUpdate(resp, req, checkID, api.HealthCritical, note)
}
// checkUpdate is the payload for a PUT to AgentCheckUpdate.
type checkUpdate struct {
// Status us one of the api.Health* states, "passing", "warning", or
// "critical".
Status string
// Output is the information to post to the UI for operators as the
// output of the process that decided to hit the TTL check. This is
// different from the note field that's associated with the check
// itself.
Output string
}
// AgentCheckUpdate is a PUT-based alternative to the GET-based Pass/Warn/Fail
// APIs.
func (s *HTTPHandlers) AgentCheckUpdate(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var update checkUpdate
if err := decodeBody(req.Body, &update); err != nil {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Request decode failed: %v", err)}
}
switch update.Status {
case api.HealthPassing:
case api.HealthWarning:
case api.HealthCritical:
default:
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Invalid check status: '%s'", update.Status)}
}
2022-06-01 17:17:14 +00:00
id := strings.TrimPrefix(req.URL.Path, "/v1/agent/check/update/")
checkID := types.CheckID(id)
return s.agentCheckUpdate(resp, req, checkID, update.Status, update.Output)
}
func (s *HTTPHandlers) agentCheckUpdate(resp http.ResponseWriter, req *http.Request, checkID types.CheckID, status string, output string) (interface{}, error) {
entMeta := acl.NewEnterpriseMetaWithPartition(s.agent.config.PartitionOrDefault(), "")
cid := structs.NewCheckID(checkID, &entMeta)
// Get the provided token, if any, and vet against any ACL policies.
var token string
s.parseToken(req, &token)
if err := s.parseEntMetaNoWildcard(req, &cid.EnterpriseMeta); err != nil {
return nil, err
}
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &cid.EnterpriseMeta, nil)
if err != nil {
return nil, err
}
cid.Normalize()
if err := s.agent.vetCheckUpdateWithAuthorizer(authz, cid); err != nil {
return nil, err
}
if !s.validateRequestPartition(resp, &cid.EnterpriseMeta) {
return nil, nil
}
if err := s.agent.updateTTLCheck(cid, status, output); err != nil {
return nil, err
}
s.syncChanges()
return nil, nil
}
// agentHealthService Returns Health for a given service ID
func agentHealthService(serviceID structs.ServiceID, s *HTTPHandlers) (int, string, api.HealthChecks) {
checks := s.agent.State.ChecksForService(serviceID, true)
serviceChecks := make(api.HealthChecks, 0)
for _, c := range checks {
// TODO: harmonize struct.HealthCheck and api.HealthCheck (or at least extract conversion function)
healthCheck := &api.HealthCheck{
Node: c.Node,
CheckID: string(c.CheckID),
Name: c.Name,
Status: c.Status,
Notes: c.Notes,
Output: c.Output,
ServiceID: c.ServiceID,
ServiceName: c.ServiceName,
ServiceTags: c.ServiceTags,
}
fillHealthCheckEnterpriseMeta(healthCheck, &c.EnterpriseMeta)
serviceChecks = append(serviceChecks, healthCheck)
}
status := serviceChecks.AggregatedStatus()
switch status {
case api.HealthWarning:
return http.StatusTooManyRequests, status, serviceChecks
case api.HealthPassing:
return http.StatusOK, status, serviceChecks
default:
return http.StatusServiceUnavailable, status, serviceChecks
}
}
func returnTextPlain(req *http.Request) bool {
if contentType := req.Header.Get("Accept"); strings.HasPrefix(contentType, "text/plain") {
return true
}
if format := req.URL.Query().Get("format"); format != "" {
return format == "text"
}
return false
}
// AgentHealthServiceByID return the local Service Health given its ID
func (s *HTTPHandlers) AgentHealthServiceByID(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Pull out the service id (service id since there may be several instance of the same service on this host)
2022-06-01 17:17:14 +00:00
serviceID := strings.TrimPrefix(req.URL.Path, "/v1/agent/health/service/id/")
if serviceID == "" {
return nil, &HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing serviceID"}
}
var entMeta acl.EnterpriseMeta
if err := s.parseEntMetaNoWildcard(req, &entMeta); err != nil {
return nil, err
}
var token string
s.parseToken(req, &token)
// need to resolve to default the meta
s.defaultMetaPartitionToAgent(&entMeta)
var authzContext acl.AuthorizerContext
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &entMeta, &authzContext)
if err != nil {
return nil, err
}
if !s.validateRequestPartition(resp, &entMeta) {
return nil, nil
}
sid := structs.NewServiceID(serviceID, &entMeta)
dc := s.agent.config.Datacenter
if service := s.agent.State.Service(sid); service != nil {
if err := authz.ToAllowAuthorizer().ServiceReadAllowed(service.Service, &authzContext); err != nil {
return nil, err
}
code, status, healthChecks := agentHealthService(sid, s)
if returnTextPlain(req) {
return status, CodeWithPayloadError{StatusCode: code, Reason: status, ContentType: "text/plain"}
}
serviceInfo := buildAgentService(service, dc)
result := &api.AgentServiceChecksInfo{
AggregatedStatus: status,
Checks: healthChecks,
Service: &serviceInfo,
}
return result, CodeWithPayloadError{StatusCode: code, Reason: status, ContentType: "application/json"}
}
notFoundReason := fmt.Sprintf("ServiceId %s not found", sid.String())
if returnTextPlain(req) {
return notFoundReason, CodeWithPayloadError{StatusCode: http.StatusNotFound, Reason: notFoundReason, ContentType: "text/plain"}
}
return &api.AgentServiceChecksInfo{
AggregatedStatus: api.HealthCritical,
Checks: nil,
Service: nil,
}, CodeWithPayloadError{StatusCode: http.StatusNotFound, Reason: notFoundReason, ContentType: "application/json"}
}
// AgentHealthServiceByName return the worse status of all the services with given name on an agent
func (s *HTTPHandlers) AgentHealthServiceByName(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Pull out the service name
2022-06-01 17:17:14 +00:00
serviceName := strings.TrimPrefix(req.URL.Path, "/v1/agent/health/service/name/")
if serviceName == "" {
return nil, &HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing service Name"}
}
var entMeta acl.EnterpriseMeta
if err := s.parseEntMetaNoWildcard(req, &entMeta); err != nil {
return nil, err
}
var token string
s.parseToken(req, &token)
s.defaultMetaPartitionToAgent(&entMeta)
// need to resolve to default the meta
var authzContext acl.AuthorizerContext
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &entMeta, &authzContext)
if err != nil {
return nil, err
}
if err := authz.ToAllowAuthorizer().ServiceReadAllowed(serviceName, &authzContext); err != nil {
return nil, err
}
if !s.validateRequestPartition(resp, &entMeta) {
return nil, nil
}
dc := s.agent.config.Datacenter
code := http.StatusNotFound
status := fmt.Sprintf("ServiceName %s Not Found", serviceName)
services := s.agent.State.ServicesByName(structs.NewServiceName(serviceName, &entMeta))
result := make([]api.AgentServiceChecksInfo, 0, 16)
for _, service := range services {
sid := structs.NewServiceID(service.ID, &entMeta)
scode, sstatus, healthChecks := agentHealthService(sid, s)
serviceInfo := buildAgentService(service, dc)
res := api.AgentServiceChecksInfo{
AggregatedStatus: sstatus,
Checks: healthChecks,
Service: &serviceInfo,
}
result = append(result, res)
// When service is not found, we ignore it and keep existing HTTP status
if code == http.StatusNotFound {
code = scode
status = sstatus
}
// We take the worst of all statuses, so we keep iterating
// passing: 200 < warning: 429 < critical: 503
if code < scode {
code = scode
status = sstatus
}
}
if returnTextPlain(req) {
return status, CodeWithPayloadError{StatusCode: code, Reason: status, ContentType: "text/plain"}
}
return result, CodeWithPayloadError{StatusCode: code, Reason: status, ContentType: "application/json"}
}
func (s *HTTPHandlers) AgentRegisterService(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var args structs.ServiceDefinition
// Fixup the type decode of TTL or Interval if a check if provided.
if err := s.parseEntMetaNoWildcard(req, &args.EnterpriseMeta); err != nil {
return nil, err
}
if err := decodeBody(req.Body, &args); err != nil {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Request decode failed: %v", err)}
}
// Verify the service has a name.
if args.Name == "" {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing service name"}
}
// Check the service address here and in the catalog RPC endpoint
// since service registration isn't synchronous.
if ipaddr.IsAny(args.Address) {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Invalid service address"}
}
var token string
s.parseToken(req, &token)
s.defaultMetaPartitionToAgent(&args.EnterpriseMeta)
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &args.EnterpriseMeta, nil)
if err != nil {
return nil, err
}
if !s.validateRequestPartition(resp, &args.EnterpriseMeta) {
return nil, nil
}
// Get the node service.
ns := args.NodeService()
2018-09-07 14:30:47 +00:00
if ns.Weights != nil {
if err := structs.ValidateWeights(ns.Weights); err != nil {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Invalid Weights: %v", err)}
2018-09-07 14:30:47 +00:00
}
}
wan federation via mesh gateways (#6884) This is like a Möbius strip of code due to the fact that low-level components (serf/memberlist) are connected to high-level components (the catalog and mesh-gateways) in a twisty maze of references which make it hard to dive into. With that in mind here's a high level summary of what you'll find in the patch: There are several distinct chunks of code that are affected: * new flags and config options for the server * retry join WAN is slightly different * retry join code is shared to discover primary mesh gateways from secondary datacenters * because retry join logic runs in the *agent* and the results of that operation for primary mesh gateways are needed in the *server* there are some methods like `RefreshPrimaryGatewayFallbackAddresses` that must occur at multiple layers of abstraction just to pass the data down to the right layer. * new cache type `FederationStateListMeshGatewaysName` for use in `proxycfg/xds` layers * the function signature for RPC dialing picked up a new required field (the node name of the destination) * several new RPCs for manipulating a FederationState object: `FederationState:{Apply,Get,List,ListMeshGateways}` * 3 read-only internal APIs for debugging use to invoke those RPCs from curl * raft and fsm changes to persist these FederationStates * replication for FederationStates as they are canonically stored in the Primary and replicated to the Secondaries. * a special derivative of anti-entropy that runs in secondaries to snapshot their local mesh gateway `CheckServiceNodes` and sync them into their upstream FederationState in the primary (this works in conjunction with the replication to distribute addresses for all mesh gateways in all DCs to all other DCs) * a "gateway locator" convenience object to make use of this data to choose the addresses of gateways to use for any given RPC or gossip operation to a remote DC. This gets data from the "retry join" logic in the agent and also directly calls into the FSM. * RPC (`:8300`) on the server sniffs the first byte of a new connection to determine if it's actually doing native TLS. If so it checks the ALPN header for protocol determination (just like how the existing system uses the type-byte marker). * 2 new kinds of protocols are exclusively decoded via this native TLS mechanism: one for ferrying "packet" operations (udp-like) from the gossip layer and one for "stream" operations (tcp-like). The packet operations re-use sockets (using length-prefixing) to cut down on TLS re-negotiation overhead. * the server instances specially wrap the `memberlist.NetTransport` when running with gateway federation enabled (in a `wanfed.Transport`). The general gist is that if it tries to dial a node in the SAME datacenter (deduced by looking at the suffix of the node name) there is no change. If dialing a DIFFERENT datacenter it is wrapped up in a TLS+ALPN blob and sent through some mesh gateways to eventually end up in a server's :8300 port. * a new flag when launching a mesh gateway via `consul connect envoy` to indicate that the servers are to be exposed. This sets a special service meta when registering the gateway into the catalog. * `proxycfg/xds` notice this metadata blob to activate additional watches for the FederationState objects as well as the location of all of the consul servers in that datacenter. * `xds:` if the extra metadata is in place additional clusters are defined in a DC to bulk sink all traffic to another DC's gateways. For the current datacenter we listen on a wildcard name (`server.<dc>.consul`) that load balances all servers as well as one mini-cluster per node (`<node>.server.<dc>.consul`) * the `consul tls cert create` command got a new flag (`-node`) to help create an additional SAN in certs that can be used with this flavor of federation.
2020-03-09 20:59:02 +00:00
if err := structs.ValidateServiceMetadata(ns.Kind, ns.Meta, false); err != nil {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Invalid Service Meta: %v", err)}
2018-02-07 00:54:42 +00:00
}
// Run validation. This same validation would happen on the catalog endpoint,
// so it helps ensure the sync will work properly.
if err := ns.Validate(); err != nil {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Validation failed: %v", err.Error())}
}
// Verify the check type.
chkTypes, err := args.CheckTypes()
if err != nil {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Invalid check: %v", err)}
}
for _, check := range chkTypes {
if check.Status != "" && !structs.ValidStatus(check.Status) {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Status for checks must 'passing', 'warning', 'critical'"}
}
}
// Verify the sidecar check types
if args.Connect != nil && args.Connect.SidecarService != nil {
chkTypes, err := args.Connect.SidecarService.CheckTypes()
if err != nil {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Invalid check in sidecar_service: %v", err)}
}
for _, check := range chkTypes {
if check.Status != "" && !structs.ValidStatus(check.Status) {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Status for checks must 'passing', 'warning', 'critical'"}
}
}
}
// Get the provided token, if any, and vet against any ACL policies.
if err := s.agent.vetServiceRegisterWithAuthorizer(authz, ns); err != nil {
return nil, err
}
// See if we have a sidecar to register too
sidecar, sidecarChecks, sidecarToken, err := sidecarServiceFromNodeService(ns, token)
if err != nil {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Invalid SidecarService: %s", err)}
}
if sidecar != nil {
if err := sidecar.ValidateForAgent(); err != nil {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Failed Validation: %v", err.Error())}
}
// Make sure we are allowed to register the sidecar using the token
// specified (might be specific to sidecar or the same one as the overall
// request).
if err := s.agent.vetServiceRegister(sidecarToken, sidecar); err != nil {
return nil, err
}
// We parsed the sidecar registration, now remove it from the NodeService
// for the actual service since it's done it's job and we don't want to
// persist it in the actual state/catalog. SidecarService is meant to be a
// registration syntax sugar so don't propagate it any further.
ns.Connect.SidecarService = nil
}
// Add the service.
replaceExistingChecks := false
query := req.URL.Query()
if len(query["replace-existing-checks"]) > 0 && (query.Get("replace-existing-checks") == "" || query.Get("replace-existing-checks") == "true") {
replaceExistingChecks = true
}
addReq := AddServiceRequest{
Service: ns,
chkTypes: chkTypes,
persist: true,
token: token,
Source: ConfigSourceRemote,
replaceExistingChecks: replaceExistingChecks,
}
if err := s.agent.AddService(addReq); err != nil {
return nil, err
}
if sidecar != nil {
addReq := AddServiceRequest{
Service: sidecar,
chkTypes: sidecarChecks,
persist: true,
token: sidecarToken,
Source: ConfigSourceRemote,
replaceExistingChecks: replaceExistingChecks,
}
if err := s.agent.AddService(addReq); err != nil {
return nil, err
}
}
s.syncChanges()
return nil, nil
}
func (s *HTTPHandlers) AgentDeregisterService(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
2022-06-01 17:17:14 +00:00
serviceID := strings.TrimPrefix(req.URL.Path, "/v1/agent/service/deregister/")
entMeta := acl.NewEnterpriseMetaWithPartition(s.agent.config.PartitionOrDefault(), "")
sid := structs.NewServiceID(serviceID, &entMeta)
// Get the provided token, if any, and vet against any ACL policies.
var token string
s.parseToken(req, &token)
if err := s.parseEntMetaNoWildcard(req, &sid.EnterpriseMeta); err != nil {
return nil, err
}
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &sid.EnterpriseMeta, nil)
if err != nil {
return nil, err
}
sid.Normalize()
if !s.validateRequestPartition(resp, &sid.EnterpriseMeta) {
return nil, nil
}
2021-11-19 16:50:44 +00:00
if err := s.agent.vetServiceUpdateWithAuthorizer(authz, sid); err != nil {
return nil, err
}
if err := s.agent.RemoveService(sid); err != nil {
return nil, err
}
s.syncChanges()
return nil, nil
}
func (s *HTTPHandlers) AgentServiceMaintenance(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Ensure we have a service ID
2022-06-01 17:17:14 +00:00
serviceID := strings.TrimPrefix(req.URL.Path, "/v1/agent/service/maintenance/")
entMeta := acl.NewEnterpriseMetaWithPartition(s.agent.config.PartitionOrDefault(), "")
sid := structs.NewServiceID(serviceID, &entMeta)
if sid.ID == "" {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing service ID"}
}
// Ensure we have some action
params := req.URL.Query()
if _, ok := params["enable"]; !ok {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing value for enable"}
}
raw := params.Get("enable")
enable, err := strconv.ParseBool(raw)
if err != nil {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Invalid value for enable: %q", raw)}
}
// Get the provided token, if any, and vet against any ACL policies.
var token string
s.parseToken(req, &token)
if err := s.parseEntMetaNoWildcard(req, &sid.EnterpriseMeta); err != nil {
return nil, err
}
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &sid.EnterpriseMeta, nil)
if err != nil {
return nil, err
}
sid.Normalize()
if !s.validateRequestPartition(resp, &sid.EnterpriseMeta) {
return nil, nil
}
if err := s.agent.vetServiceUpdateWithAuthorizer(authz, sid); err != nil {
return nil, err
}
if enable {
reason := params.Get("reason")
if err = s.agent.EnableServiceMaintenance(sid, reason, token); err != nil {
return nil, HTTPError{StatusCode: http.StatusNotFound, Reason: err.Error()}
}
} else {
if err = s.agent.DisableServiceMaintenance(sid); err != nil {
return nil, HTTPError{StatusCode: http.StatusNotFound, Reason: err.Error()}
}
}
s.syncChanges()
return nil, nil
}
2015-01-15 19:20:22 +00:00
func (s *HTTPHandlers) AgentNodeMaintenance(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
2015-01-15 19:20:22 +00:00
// Ensure we have some action
params := req.URL.Query()
if _, ok := params["enable"]; !ok {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing value for enable"}
2015-01-15 19:20:22 +00:00
}
raw := params.Get("enable")
enable, err := strconv.ParseBool(raw)
if err != nil {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Invalid value for enable: %q", raw)}
2015-01-15 19:20:22 +00:00
}
// Get the provided token, if any, and vet against any ACL policies.
var token string
s.parseToken(req, &token)
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil)
if err != nil {
return nil, err
}
var authzContext acl.AuthorizerContext
s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext)
if err := authz.ToAllowAuthorizer().NodeWriteAllowed(s.agent.config.NodeName, &authzContext); err != nil {
return nil, err
}
2015-01-15 19:20:22 +00:00
if enable {
s.agent.EnableNodeMaintenance(params.Get("reason"), token)
2015-01-15 19:20:22 +00:00
} else {
s.agent.DisableNodeMaintenance()
}
s.syncChanges()
2015-01-15 19:20:22 +00:00
return nil, nil
}
func (s *HTTPHandlers) AgentMonitor(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil)
if err != nil {
2016-11-28 21:08:31 +00:00
return nil, err
}
// Authorize using the agent's own enterprise meta, not the token.
var authzContext acl.AuthorizerContext
s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext)
if err := authz.ToAllowAuthorizer().AgentReadAllowed(s.agent.config.NodeName, &authzContext); err != nil {
return nil, err
}
2016-11-28 21:08:31 +00:00
// Get the provided loglevel.
2016-11-16 21:45:26 +00:00
logLevel := req.URL.Query().Get("loglevel")
if logLevel == "" {
logLevel = "INFO"
}
var logJSON bool
if _, ok := req.URL.Query()["logjson"]; ok {
logJSON = true
}
2016-11-16 21:45:26 +00:00
if !logging.ValidateLogLevel(logLevel) {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Unknown log level: %s", logLevel)}
2016-11-16 21:45:26 +00:00
}
2016-11-16 21:45:26 +00:00
flusher, ok := resp.(http.Flusher)
if !ok {
return nil, fmt.Errorf("Streaming not supported")
}
monitor := monitor.New(monitor.Config{
BufferSize: 512,
Logger: s.agent.logger,
LoggerOptions: &hclog.LoggerOptions{
Level: logging.LevelFromString(logLevel),
JSONFormat: logJSON,
},
})
logsCh := monitor.Start()
2016-11-16 21:45:26 +00:00
// Send header so client can start streaming body
resp.WriteHeader(http.StatusOK)
// 0 byte write is needed before the Flush call so that if we are using
// a gzip stream it will go ahead and write out the HTTP response header
resp.Write([]byte(""))
flusher.Flush()
const flushDelay = 200 * time.Millisecond
flushTicker := time.NewTicker(flushDelay)
defer flushTicker.Stop()
// Stream logs until the connection is closed.
2016-11-16 21:45:26 +00:00
for {
select {
case <-req.Context().Done():
droppedCount := monitor.Stop()
if droppedCount > 0 {
s.agent.logger.Warn("Dropped logs during monitor request", "dropped_count", droppedCount)
2016-11-28 21:08:31 +00:00
}
flusher.Flush()
2016-11-16 21:45:26 +00:00
return nil, nil
case log := <-logsCh:
fmt.Fprint(resp, string(log))
case <-flushTicker.C:
2016-11-16 21:45:26 +00:00
flusher.Flush()
}
}
}
func (s *HTTPHandlers) AgentToken(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if s.checkACLDisabled() {
return nil, HTTPError{StatusCode: http.StatusUnauthorized, Reason: "ACL support disabled"}
}
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil)
if err != nil {
return nil, err
}
// Authorize using the agent's own enterprise meta, not the token.
var authzContext acl.AuthorizerContext
s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext)
if err := authz.ToAllowAuthorizer().AgentWriteAllowed(s.agent.config.NodeName, &authzContext); err != nil {
return nil, err
}
// The body is just the token, but it's in a JSON object so we can add
// fields to this later if needed.
var args api.AgentToken
if err := decodeBody(req.Body, &args); err != nil {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Request decode failed: %v", err)}
}
// Figure out the target token.
2022-06-01 17:17:14 +00:00
target := strings.TrimPrefix(req.URL.Path, "/v1/agent/token/")
err = s.agent.tokens.WithPersistenceLock(func() error {
triggerAntiEntropySync := false
switch target {
case "acl_token", "default":
changed := s.agent.tokens.UpdateUserToken(args.Token, token_store.TokenSourceAPI)
if changed {
triggerAntiEntropySync = true
}
ACL Token Persistence and Reloading (#5328) This PR adds two features which will be useful for operators when ACLs are in use. 1. Tokens set in configuration files are now reloadable. 2. If `acl.enable_token_persistence` is set to `true` in the configuration, tokens set via the `v1/agent/token` endpoint are now persisted to disk and loaded when the agent starts (or during configuration reload) Note that token persistence is opt-in so our users who do not want tokens on the local disk will see no change. Some other secondary changes: * Refactored a bunch of places where the replication token is retrieved from the token store. This token isn't just for replicating ACLs and now it is named accordingly. * Allowed better paths in the `v1/agent/token/` API. Instead of paths like: `v1/agent/token/acl_replication_token` the path can now be just `v1/agent/token/replication`. The old paths remain to be valid. * Added a couple new API functions to set tokens via the new paths. Deprecated the old ones and pointed to the new names. The names are also generally better and don't imply that what you are setting is for ACLs but rather are setting ACL tokens. There is a minor semantic difference there especially for the replication token as again, its no longer used only for ACL token/policy replication. The new functions will detect 404s and fallback to using the older token paths when talking to pre-1.4.3 agents. * Docs updated to reflect the API additions and to show using the new endpoints. * Updated the ACL CLI set-agent-tokens command to use the non-deprecated APIs.
2019-02-27 19:28:31 +00:00
case "acl_agent_token", "agent":
changed := s.agent.tokens.UpdateAgentToken(args.Token, token_store.TokenSourceAPI)
if changed {
triggerAntiEntropySync = true
}
ACL Token Persistence and Reloading (#5328) This PR adds two features which will be useful for operators when ACLs are in use. 1. Tokens set in configuration files are now reloadable. 2. If `acl.enable_token_persistence` is set to `true` in the configuration, tokens set via the `v1/agent/token` endpoint are now persisted to disk and loaded when the agent starts (or during configuration reload) Note that token persistence is opt-in so our users who do not want tokens on the local disk will see no change. Some other secondary changes: * Refactored a bunch of places where the replication token is retrieved from the token store. This token isn't just for replicating ACLs and now it is named accordingly. * Allowed better paths in the `v1/agent/token/` API. Instead of paths like: `v1/agent/token/acl_replication_token` the path can now be just `v1/agent/token/replication`. The old paths remain to be valid. * Added a couple new API functions to set tokens via the new paths. Deprecated the old ones and pointed to the new names. The names are also generally better and don't imply that what you are setting is for ACLs but rather are setting ACL tokens. There is a minor semantic difference there especially for the replication token as again, its no longer used only for ACL token/policy replication. The new functions will detect 404s and fallback to using the older token paths when talking to pre-1.4.3 agents. * Docs updated to reflect the API additions and to show using the new endpoints. * Updated the ACL CLI set-agent-tokens command to use the non-deprecated APIs.
2019-02-27 19:28:31 +00:00
case "acl_agent_master_token", "agent_master", "agent_recovery":
s.agent.tokens.UpdateAgentRecoveryToken(args.Token, token_store.TokenSourceAPI)
ACL Token Persistence and Reloading (#5328) This PR adds two features which will be useful for operators when ACLs are in use. 1. Tokens set in configuration files are now reloadable. 2. If `acl.enable_token_persistence` is set to `true` in the configuration, tokens set via the `v1/agent/token` endpoint are now persisted to disk and loaded when the agent starts (or during configuration reload) Note that token persistence is opt-in so our users who do not want tokens on the local disk will see no change. Some other secondary changes: * Refactored a bunch of places where the replication token is retrieved from the token store. This token isn't just for replicating ACLs and now it is named accordingly. * Allowed better paths in the `v1/agent/token/` API. Instead of paths like: `v1/agent/token/acl_replication_token` the path can now be just `v1/agent/token/replication`. The old paths remain to be valid. * Added a couple new API functions to set tokens via the new paths. Deprecated the old ones and pointed to the new names. The names are also generally better and don't imply that what you are setting is for ACLs but rather are setting ACL tokens. There is a minor semantic difference there especially for the replication token as again, its no longer used only for ACL token/policy replication. The new functions will detect 404s and fallback to using the older token paths when talking to pre-1.4.3 agents. * Docs updated to reflect the API additions and to show using the new endpoints. * Updated the ACL CLI set-agent-tokens command to use the non-deprecated APIs.
2019-02-27 19:28:31 +00:00
case "acl_replication_token", "replication":
s.agent.tokens.UpdateReplicationToken(args.Token, token_store.TokenSourceAPI)
ACL Token Persistence and Reloading (#5328) This PR adds two features which will be useful for operators when ACLs are in use. 1. Tokens set in configuration files are now reloadable. 2. If `acl.enable_token_persistence` is set to `true` in the configuration, tokens set via the `v1/agent/token` endpoint are now persisted to disk and loaded when the agent starts (or during configuration reload) Note that token persistence is opt-in so our users who do not want tokens on the local disk will see no change. Some other secondary changes: * Refactored a bunch of places where the replication token is retrieved from the token store. This token isn't just for replicating ACLs and now it is named accordingly. * Allowed better paths in the `v1/agent/token/` API. Instead of paths like: `v1/agent/token/acl_replication_token` the path can now be just `v1/agent/token/replication`. The old paths remain to be valid. * Added a couple new API functions to set tokens via the new paths. Deprecated the old ones and pointed to the new names. The names are also generally better and don't imply that what you are setting is for ACLs but rather are setting ACL tokens. There is a minor semantic difference there especially for the replication token as again, its no longer used only for ACL token/policy replication. The new functions will detect 404s and fallback to using the older token paths when talking to pre-1.4.3 agents. * Docs updated to reflect the API additions and to show using the new endpoints. * Updated the ACL CLI set-agent-tokens command to use the non-deprecated APIs.
2019-02-27 19:28:31 +00:00
case "config_file_service_registration":
s.agent.tokens.UpdateConfigFileRegistrationToken(args.Token, token_store.TokenSourceAPI)
default:
return HTTPError{StatusCode: http.StatusNotFound, Reason: fmt.Sprintf("Token %q is unknown", target)}
ACL Token Persistence and Reloading (#5328) This PR adds two features which will be useful for operators when ACLs are in use. 1. Tokens set in configuration files are now reloadable. 2. If `acl.enable_token_persistence` is set to `true` in the configuration, tokens set via the `v1/agent/token` endpoint are now persisted to disk and loaded when the agent starts (or during configuration reload) Note that token persistence is opt-in so our users who do not want tokens on the local disk will see no change. Some other secondary changes: * Refactored a bunch of places where the replication token is retrieved from the token store. This token isn't just for replicating ACLs and now it is named accordingly. * Allowed better paths in the `v1/agent/token/` API. Instead of paths like: `v1/agent/token/acl_replication_token` the path can now be just `v1/agent/token/replication`. The old paths remain to be valid. * Added a couple new API functions to set tokens via the new paths. Deprecated the old ones and pointed to the new names. The names are also generally better and don't imply that what you are setting is for ACLs but rather are setting ACL tokens. There is a minor semantic difference there especially for the replication token as again, its no longer used only for ACL token/policy replication. The new functions will detect 404s and fallback to using the older token paths when talking to pre-1.4.3 agents. * Docs updated to reflect the API additions and to show using the new endpoints. * Updated the ACL CLI set-agent-tokens command to use the non-deprecated APIs.
2019-02-27 19:28:31 +00:00
}
// TODO: is it safe to move this out of WithPersistenceLock?
if triggerAntiEntropySync {
s.agent.sync.SyncFull.Trigger()
ACL Token Persistence and Reloading (#5328) This PR adds two features which will be useful for operators when ACLs are in use. 1. Tokens set in configuration files are now reloadable. 2. If `acl.enable_token_persistence` is set to `true` in the configuration, tokens set via the `v1/agent/token` endpoint are now persisted to disk and loaded when the agent starts (or during configuration reload) Note that token persistence is opt-in so our users who do not want tokens on the local disk will see no change. Some other secondary changes: * Refactored a bunch of places where the replication token is retrieved from the token store. This token isn't just for replicating ACLs and now it is named accordingly. * Allowed better paths in the `v1/agent/token/` API. Instead of paths like: `v1/agent/token/acl_replication_token` the path can now be just `v1/agent/token/replication`. The old paths remain to be valid. * Added a couple new API functions to set tokens via the new paths. Deprecated the old ones and pointed to the new names. The names are also generally better and don't imply that what you are setting is for ACLs but rather are setting ACL tokens. There is a minor semantic difference there especially for the replication token as again, its no longer used only for ACL token/policy replication. The new functions will detect 404s and fallback to using the older token paths when talking to pre-1.4.3 agents. * Docs updated to reflect the API additions and to show using the new endpoints. * Updated the ACL CLI set-agent-tokens command to use the non-deprecated APIs.
2019-02-27 19:28:31 +00:00
}
return nil
})
if err != nil {
return nil, err
ACL Token Persistence and Reloading (#5328) This PR adds two features which will be useful for operators when ACLs are in use. 1. Tokens set in configuration files are now reloadable. 2. If `acl.enable_token_persistence` is set to `true` in the configuration, tokens set via the `v1/agent/token` endpoint are now persisted to disk and loaded when the agent starts (or during configuration reload) Note that token persistence is opt-in so our users who do not want tokens on the local disk will see no change. Some other secondary changes: * Refactored a bunch of places where the replication token is retrieved from the token store. This token isn't just for replicating ACLs and now it is named accordingly. * Allowed better paths in the `v1/agent/token/` API. Instead of paths like: `v1/agent/token/acl_replication_token` the path can now be just `v1/agent/token/replication`. The old paths remain to be valid. * Added a couple new API functions to set tokens via the new paths. Deprecated the old ones and pointed to the new names. The names are also generally better and don't imply that what you are setting is for ACLs but rather are setting ACL tokens. There is a minor semantic difference there especially for the replication token as again, its no longer used only for ACL token/policy replication. The new functions will detect 404s and fallback to using the older token paths when talking to pre-1.4.3 agents. * Docs updated to reflect the API additions and to show using the new endpoints. * Updated the ACL CLI set-agent-tokens command to use the non-deprecated APIs.
2019-02-27 19:28:31 +00:00
}
s.agent.logger.Info("Updated agent's ACL token", "token", target)
return nil, nil
}
2018-03-17 04:39:26 +00:00
// AgentConnectCARoots returns the trusted CA roots.
func (s *HTTPHandlers) AgentConnectCARoots(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var args structs.DCSpecificRequest
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
raw, m, err := s.agent.cache.Get(req.Context(), cachetype.ConnectCARootName, &args)
if err != nil {
return nil, err
}
defer setCacheMeta(resp, &m)
// Add cache hit
reply, ok := raw.(*structs.IndexedCARoots)
if !ok {
// This should never happen, but we want to protect against panics
return nil, fmt.Errorf("internal error: response type not correct")
}
defer setMeta(resp, &reply.QueryMeta)
return *reply, nil
2018-03-17 04:39:26 +00:00
}
2018-03-21 17:55:39 +00:00
// AgentConnectCALeafCert returns the certificate bundle for a service
// instance. This endpoint ignores all "Cache-Control" attributes.
// This supports blocking queries to update the returned bundle.
// Non-blocking queries will always verify that the cache entry is still valid.
func (s *HTTPHandlers) AgentConnectCALeafCert(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Get the service name. Note that this is the name of the service,
2018-05-22 17:33:14 +00:00
// not the ID of the service instance.
2022-06-01 17:17:14 +00:00
serviceName := strings.TrimPrefix(req.URL.Path, "/v1/agent/connect/ca/leaf/")
2018-03-21 17:55:39 +00:00
// TODO(peering): expose way to get kind=mesh-gateway type cert with appropriate ACLs
args := cachetype.ConnectCALeafRequest{
2019-08-27 21:45:58 +00:00
Service: serviceName, // Need name not ID
}
var qOpts structs.QueryOptions
if err := s.parseEntMetaNoWildcard(req, &args.EnterpriseMeta); err != nil {
return nil, err
}
// Store DC in the ConnectCALeafRequest but query opts separately
if done := s.parse(resp, req, &args.Datacenter, &qOpts); done {
return nil, nil
}
args.MinQueryIndex = qOpts.MinQueryIndex
args.MaxQueryTime = qOpts.MaxQueryTime
args.Token = qOpts.Token
// TODO(ffmmmm): maybe set MustRevalidate in ConnectCALeafRequest (as part of CacheInfo())
// We don't want non-blocking queries to return expired leaf certs
// or leaf certs not valid under the current CA. So always revalidate
// the leaf cert on non-blocking queries (ie when MinQueryIndex == 0)
if args.MinQueryIndex == 0 {
args.MustRevalidate = true
}
if !s.validateRequestPartition(resp, &args.EnterpriseMeta) {
return nil, nil
}
raw, m, err := s.agent.cache.Get(req.Context(), cachetype.ConnectCALeafName, &args)
if err != nil {
return nil, err
}
defer setCacheMeta(resp, &m)
reply, ok := raw.(*structs.IssuedCert)
if !ok {
// This should never happen, but we want to protect against panics
return nil, fmt.Errorf("internal error: response type not correct")
}
setIndex(resp, reply.ModifyIndex)
2018-03-21 17:55:39 +00:00
return reply, nil
2018-03-21 17:55:39 +00:00
}
// AgentConnectAuthorize
//
// POST /v1/agent/connect/authorize
//
// NOTE: This endpoint treats any L7 intentions as DENY.
//
2018-05-19 04:03:10 +00:00
// Note: when this logic changes, consider if the Intention.Check RPC method
// also needs to be updated.
func (s *HTTPHandlers) AgentConnectAuthorize(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the token
var token string
s.parseToken(req, &token)
var authReq structs.ConnectAuthorizeRequest
if err := s.parseEntMetaNoWildcard(req, &authReq.EnterpriseMeta); err != nil {
return nil, err
}
if err := decodeBody(req.Body, &authReq); err != nil {
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: fmt.Sprintf("Request decode failed: %v", err)}
}
2018-03-27 17:09:13 +00:00
if !s.validateRequestPartition(resp, &authReq.EnterpriseMeta) {
return nil, nil
}
authz, reason, cacheMeta, err := s.agent.ConnectAuthorize(token, &authReq)
if err != nil {
return nil, err
}
setCacheMeta(resp, cacheMeta)
return &connectAuthorizeResp{
Authorized: authz,
Reason: reason,
}, nil
}
// connectAuthorizeResp is the response format/structure for the
// /v1/agent/connect/authorize endpoint.
type connectAuthorizeResp struct {
Authorized bool // True if authorized, false if not
Reason string // Reason for the Authorized value (whether true or false)
}
New command: consul debug (#4754) * agent/debug: add package for debugging, host info * api: add v1/agent/host endpoint * agent: add v1/agent/host endpoint * command/debug: implementation of static capture * command/debug: tests and only configured targets * agent/debug: add basic test for host metrics * command/debug: add methods for dynamic data capture * api: add debug/pprof endpoints * command/debug: add pprof * command/debug: timing, wg, logs to disk * vendor: add gopsutil/disk * command/debug: add a usage section * website: add docs for consul debug * agent/host: require operator:read * api/host: improve docs and no retry timing * command/debug: fail on extra arguments * command/debug: fixup file permissions to 0644 * command/debug: remove server flags * command/debug: improve clarity of usage section * api/debug: add Trace for profiling, fix profile * command/debug: capture profile and trace at the same time * command/debug: add index document * command/debug: use "clusters" in place of members * command/debug: remove address in output * command/debug: improve comment on metrics sleep * command/debug: clarify usage * agent: always register pprof handlers and protect This will allow us to avoid a restart of a target agent for profiling by always registering the pprof handlers. Given this is a potentially sensitive path, it is protected with an operator:read ACL and enable debug being set to true on the target agent. enable_debug still requires a restart. If ACLs are disabled, enable_debug is sufficient. * command/debug: use trace.out instead of .prof More in line with golang docs. * agent: fix comment wording * agent: wrap table driven tests in t.run()
2018-10-17 20:20:35 +00:00
// AgentHost
//
// GET /v1/agent/host
//
// Retrieves information about resources available and in-use for the
// host the agent is running on such as CPU, memory, and disk usage. Requires
// a operator:read ACL token.
func (s *HTTPHandlers) AgentHost(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
New command: consul debug (#4754) * agent/debug: add package for debugging, host info * api: add v1/agent/host endpoint * agent: add v1/agent/host endpoint * command/debug: implementation of static capture * command/debug: tests and only configured targets * agent/debug: add basic test for host metrics * command/debug: add methods for dynamic data capture * api: add debug/pprof endpoints * command/debug: add pprof * command/debug: timing, wg, logs to disk * vendor: add gopsutil/disk * command/debug: add a usage section * website: add docs for consul debug * agent/host: require operator:read * api/host: improve docs and no retry timing * command/debug: fail on extra arguments * command/debug: fixup file permissions to 0644 * command/debug: remove server flags * command/debug: improve clarity of usage section * api/debug: add Trace for profiling, fix profile * command/debug: capture profile and trace at the same time * command/debug: add index document * command/debug: use "clusters" in place of members * command/debug: remove address in output * command/debug: improve comment on metrics sleep * command/debug: clarify usage * agent: always register pprof handlers and protect This will allow us to avoid a restart of a target agent for profiling by always registering the pprof handlers. Given this is a potentially sensitive path, it is protected with an operator:read ACL and enable debug being set to true on the target agent. enable_debug still requires a restart. If ACLs are disabled, enable_debug is sufficient. * command/debug: use trace.out instead of .prof More in line with golang docs. * agent: fix comment wording * agent: wrap table driven tests in t.run()
2018-10-17 20:20:35 +00:00
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil)
New command: consul debug (#4754) * agent/debug: add package for debugging, host info * api: add v1/agent/host endpoint * agent: add v1/agent/host endpoint * command/debug: implementation of static capture * command/debug: tests and only configured targets * agent/debug: add basic test for host metrics * command/debug: add methods for dynamic data capture * api: add debug/pprof endpoints * command/debug: add pprof * command/debug: timing, wg, logs to disk * vendor: add gopsutil/disk * command/debug: add a usage section * website: add docs for consul debug * agent/host: require operator:read * api/host: improve docs and no retry timing * command/debug: fail on extra arguments * command/debug: fixup file permissions to 0644 * command/debug: remove server flags * command/debug: improve clarity of usage section * api/debug: add Trace for profiling, fix profile * command/debug: capture profile and trace at the same time * command/debug: add index document * command/debug: use "clusters" in place of members * command/debug: remove address in output * command/debug: improve comment on metrics sleep * command/debug: clarify usage * agent: always register pprof handlers and protect This will allow us to avoid a restart of a target agent for profiling by always registering the pprof handlers. Given this is a potentially sensitive path, it is protected with an operator:read ACL and enable debug being set to true on the target agent. enable_debug still requires a restart. If ACLs are disabled, enable_debug is sufficient. * command/debug: use trace.out instead of .prof More in line with golang docs. * agent: fix comment wording * agent: wrap table driven tests in t.run()
2018-10-17 20:20:35 +00:00
if err != nil {
return nil, err
}
2021-08-20 22:11:01 +00:00
// TODO(partitions): should this be possible in a partition?
if err := authz.ToAllowAuthorizer().OperatorReadAllowed(nil); err != nil {
return nil, err
New command: consul debug (#4754) * agent/debug: add package for debugging, host info * api: add v1/agent/host endpoint * agent: add v1/agent/host endpoint * command/debug: implementation of static capture * command/debug: tests and only configured targets * agent/debug: add basic test for host metrics * command/debug: add methods for dynamic data capture * api: add debug/pprof endpoints * command/debug: add pprof * command/debug: timing, wg, logs to disk * vendor: add gopsutil/disk * command/debug: add a usage section * website: add docs for consul debug * agent/host: require operator:read * api/host: improve docs and no retry timing * command/debug: fail on extra arguments * command/debug: fixup file permissions to 0644 * command/debug: remove server flags * command/debug: improve clarity of usage section * api/debug: add Trace for profiling, fix profile * command/debug: capture profile and trace at the same time * command/debug: add index document * command/debug: use "clusters" in place of members * command/debug: remove address in output * command/debug: improve comment on metrics sleep * command/debug: clarify usage * agent: always register pprof handlers and protect This will allow us to avoid a restart of a target agent for profiling by always registering the pprof handlers. Given this is a potentially sensitive path, it is protected with an operator:read ACL and enable debug being set to true on the target agent. enable_debug still requires a restart. If ACLs are disabled, enable_debug is sufficient. * command/debug: use trace.out instead of .prof More in line with golang docs. * agent: fix comment wording * agent: wrap table driven tests in t.run()
2018-10-17 20:20:35 +00:00
}
return debug.CollectHostInfo(), nil
}