f9a43a1e2d
* ACL Authorizer overhaul To account for upcoming features every Authorization function can now take an extra *acl.EnterpriseAuthorizerContext. These are unused in OSS and will always be nil. Additionally the acl package has received some thorough refactoring to enable all of the extra Consul Enterprise specific authorizations including moving sentinel enforcement into the stubbed structs. The Authorizer funcs now return an acl.EnforcementDecision instead of a boolean. This improves the overall interface as it makes multiple Authorizers easily chainable as they now indicate whether they had an authoritative decision or should use some other defaults. A ChainedAuthorizer was added to handle this Authorizer enforcement chain and will never itself return a non-authoritative decision. * Include stub for extra enterprise rules in the global management policy * Allow for an upgrade of the global-management policy
1382 lines
41 KiB
Go
1382 lines
41 KiB
Go
package agent
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/hashicorp/go-memdb"
|
|
"github.com/mitchellh/hashstructure"
|
|
|
|
"github.com/hashicorp/consul/acl"
|
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
|
"github.com/hashicorp/consul/agent/debug"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
token_store "github.com/hashicorp/consul/agent/token"
|
|
"github.com/hashicorp/consul/api"
|
|
"github.com/hashicorp/consul/ipaddr"
|
|
"github.com/hashicorp/consul/lib"
|
|
"github.com/hashicorp/consul/lib/file"
|
|
"github.com/hashicorp/consul/logger"
|
|
"github.com/hashicorp/consul/types"
|
|
"github.com/hashicorp/go-bexpr"
|
|
"github.com/hashicorp/logutils"
|
|
"github.com/hashicorp/serf/coordinate"
|
|
"github.com/hashicorp/serf/serf"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
)
|
|
|
|
type Self struct {
|
|
Config interface{}
|
|
DebugConfig map[string]interface{}
|
|
Coord *coordinate.Coordinate
|
|
Member serf.Member
|
|
Stats map[string]map[string]string
|
|
Meta map[string]string
|
|
}
|
|
|
|
func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
// Fetch the ACL token, if any, and enforce agent policy.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
rule, err := s.agent.resolveToken(token)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if rule != nil && rule.AgentRead(s.agent.config.NodeName, nil) != acl.Allow {
|
|
return nil, acl.ErrPermissionDenied
|
|
}
|
|
|
|
var cs lib.CoordinateSet
|
|
if !s.agent.config.DisableCoordinates {
|
|
var err error
|
|
if cs, err = s.agent.GetLANCoordinate(); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
config := struct {
|
|
Datacenter string
|
|
NodeName string
|
|
NodeID string
|
|
Revision string
|
|
Server bool
|
|
Version string
|
|
}{
|
|
Datacenter: s.agent.config.Datacenter,
|
|
NodeName: s.agent.config.NodeName,
|
|
NodeID: string(s.agent.config.NodeID),
|
|
Revision: s.agent.config.Revision,
|
|
Server: s.agent.config.ServerMode,
|
|
Version: s.agent.config.Version,
|
|
}
|
|
return Self{
|
|
Config: config,
|
|
DebugConfig: s.agent.config.Sanitized(),
|
|
Coord: cs[s.agent.config.SegmentName],
|
|
Member: s.agent.LocalMember(),
|
|
Stats: s.agent.Stats(),
|
|
Meta: s.agent.State.Metadata(),
|
|
}, nil
|
|
}
|
|
|
|
// enablePrometheusOutput will look for Prometheus mime-type or format Query parameter the same way as Nomad
|
|
func enablePrometheusOutput(req *http.Request) bool {
|
|
if format := req.URL.Query().Get("format"); format == "prometheus" {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (s *HTTPServer) AgentMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
// Fetch the ACL token, if any, and enforce agent policy.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
rule, err := s.agent.resolveToken(token)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if rule != nil && rule.AgentRead(s.agent.config.NodeName, nil) != acl.Allow {
|
|
return nil, acl.ErrPermissionDenied
|
|
}
|
|
if enablePrometheusOutput(req) {
|
|
if s.agent.config.Telemetry.PrometheusRetentionTime < 1 {
|
|
resp.WriteHeader(http.StatusUnsupportedMediaType)
|
|
fmt.Fprint(resp, "Prometheus is not enabled since its retention time is not positive")
|
|
return nil, nil
|
|
}
|
|
handlerOptions := promhttp.HandlerOpts{
|
|
ErrorLog: s.agent.logger,
|
|
ErrorHandling: promhttp.ContinueOnError,
|
|
}
|
|
|
|
handler := promhttp.HandlerFor(prometheus.DefaultGatherer, handlerOptions)
|
|
handler.ServeHTTP(resp, req)
|
|
return nil, nil
|
|
}
|
|
return s.agent.MemSink.DisplayMetrics(resp, req)
|
|
}
|
|
|
|
func (s *HTTPServer) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
// Fetch the ACL token, if any, and enforce agent policy.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
rule, err := s.agent.resolveToken(token)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if rule != nil && rule.AgentWrite(s.agent.config.NodeName, nil) != acl.Allow {
|
|
return nil, acl.ErrPermissionDenied
|
|
}
|
|
|
|
// Trigger the reload
|
|
errCh := make(chan error)
|
|
select {
|
|
case <-s.agent.shutdownCh:
|
|
return nil, fmt.Errorf("Agent was shutdown before reload could be completed")
|
|
case s.agent.reloadCh <- errCh:
|
|
}
|
|
|
|
// Wait for the result of the reload, or for the agent to shutdown
|
|
select {
|
|
case <-s.agent.shutdownCh:
|
|
return nil, fmt.Errorf("Agent was shutdown before reload could be completed")
|
|
case err := <-errCh:
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
func buildAgentService(s *structs.NodeService) api.AgentService {
|
|
weights := api.AgentWeights{Passing: 1, Warning: 1}
|
|
if s.Weights != nil {
|
|
if s.Weights.Passing > 0 {
|
|
weights.Passing = s.Weights.Passing
|
|
}
|
|
weights.Warning = s.Weights.Warning
|
|
}
|
|
|
|
var taggedAddrs map[string]api.ServiceAddress
|
|
if len(s.TaggedAddresses) > 0 {
|
|
taggedAddrs = make(map[string]api.ServiceAddress)
|
|
for k, v := range s.TaggedAddresses {
|
|
taggedAddrs[k] = v.ToAPIServiceAddress()
|
|
}
|
|
}
|
|
|
|
as := api.AgentService{
|
|
Kind: api.ServiceKind(s.Kind),
|
|
ID: s.ID,
|
|
Service: s.Service,
|
|
Tags: s.Tags,
|
|
Meta: s.Meta,
|
|
Port: s.Port,
|
|
Address: s.Address,
|
|
TaggedAddresses: taggedAddrs,
|
|
EnableTagOverride: s.EnableTagOverride,
|
|
CreateIndex: s.CreateIndex,
|
|
ModifyIndex: s.ModifyIndex,
|
|
Weights: weights,
|
|
}
|
|
|
|
if as.Tags == nil {
|
|
as.Tags = []string{}
|
|
}
|
|
if as.Meta == nil {
|
|
as.Meta = map[string]string{}
|
|
}
|
|
// Attach Proxy config if exists
|
|
if s.Kind == structs.ServiceKindConnectProxy ||
|
|
s.Kind == structs.ServiceKindMeshGateway {
|
|
|
|
as.Proxy = s.Proxy.ToAPI()
|
|
}
|
|
|
|
// Attach Connect configs if they exist.
|
|
if s.Connect.Native {
|
|
as.Connect = &api.AgentServiceConnect{
|
|
Native: true,
|
|
}
|
|
}
|
|
return as
|
|
}
|
|
|
|
func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
// Fetch the ACL token, if any.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
|
|
var filterExpression string
|
|
s.parseFilter(req, &filterExpression)
|
|
|
|
services := s.agent.State.Services()
|
|
if err := s.agent.filterServices(token, &services); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Convert into api.AgentService since that includes Connect config but so far
|
|
// NodeService doesn't need to internally. They are otherwise identical since
|
|
// that is the struct used in client for reading the one we output here
|
|
// anyway.
|
|
agentSvcs := make(map[string]*api.AgentService)
|
|
|
|
// Use empty list instead of nil
|
|
for id, s := range services {
|
|
agentService := buildAgentService(s)
|
|
agentSvcs[id] = &agentService
|
|
}
|
|
|
|
filter, err := bexpr.CreateFilter(filterExpression, nil, agentSvcs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return filter.Execute(agentSvcs)
|
|
}
|
|
|
|
// GET /v1/agent/service/:service_id
|
|
//
|
|
// Returns the service definition for a single local services and allows
|
|
// blocking watch using hash-based blocking.
|
|
func (s *HTTPServer) AgentService(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
// Get the proxy ID. Note that this is the ID of a proxy's service instance.
|
|
id := strings.TrimPrefix(req.URL.Path, "/v1/agent/service/")
|
|
|
|
// Maybe block
|
|
var queryOpts structs.QueryOptions
|
|
if parseWait(resp, req, &queryOpts) {
|
|
// parseWait returns an error itself
|
|
return nil, nil
|
|
}
|
|
|
|
// Parse the token
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
|
|
// Parse hash specially. Eventually this should happen in parseWait and end up
|
|
// in QueryOptions but I didn't want to make very general changes right away.
|
|
hash := req.URL.Query().Get("hash")
|
|
|
|
resultHash, service, err := s.agent.LocalBlockingQuery(false, hash, queryOpts.MaxQueryTime,
|
|
func(ws memdb.WatchSet) (string, interface{}, error) {
|
|
|
|
svcState := s.agent.State.ServiceState(id)
|
|
if svcState == nil {
|
|
resp.WriteHeader(http.StatusNotFound)
|
|
fmt.Fprintf(resp, "unknown proxy service ID: %s", id)
|
|
return "", nil, nil
|
|
}
|
|
|
|
svc := svcState.Service
|
|
|
|
// Setup watch on the service
|
|
ws.Add(svcState.WatchCh)
|
|
|
|
// Check ACLs.
|
|
rule, err := s.agent.resolveToken(token)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
// TODO (namespaces) - pass through a real ent authz ctx
|
|
if rule != nil && rule.ServiceRead(svc.Service, nil) != acl.Allow {
|
|
return "", nil, acl.ErrPermissionDenied
|
|
}
|
|
|
|
// Calculate the content hash over the response, minus the hash field
|
|
aSvc := buildAgentService(svc)
|
|
reply := &aSvc
|
|
|
|
rawHash, err := hashstructure.Hash(reply, nil)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
|
|
// Include the ContentHash in the response body
|
|
reply.ContentHash = fmt.Sprintf("%x", rawHash)
|
|
|
|
return reply.ContentHash, reply, nil
|
|
},
|
|
)
|
|
if resultHash != "" {
|
|
resp.Header().Set("X-Consul-ContentHash", resultHash)
|
|
}
|
|
return service, err
|
|
}
|
|
|
|
func (s *HTTPServer) AgentChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
// Fetch the ACL token, if any.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
|
|
var filterExpression string
|
|
s.parseFilter(req, &filterExpression)
|
|
filter, err := bexpr.CreateFilter(filterExpression, nil, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
checks := s.agent.State.Checks()
|
|
if err := s.agent.filterChecks(token, &checks); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Use empty list instead of nil
|
|
for id, c := range checks {
|
|
if c.ServiceTags == nil {
|
|
clone := *c
|
|
clone.ServiceTags = make([]string, 0)
|
|
checks[id] = &clone
|
|
}
|
|
}
|
|
|
|
return filter.Execute(checks)
|
|
}
|
|
|
|
func (s *HTTPServer) AgentMembers(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
// Fetch the ACL token, if any.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
|
|
// Check if the WAN is being queried
|
|
wan := false
|
|
if other := req.URL.Query().Get("wan"); other != "" {
|
|
wan = true
|
|
}
|
|
|
|
segment := req.URL.Query().Get("segment")
|
|
if wan {
|
|
switch segment {
|
|
case "", api.AllSegments:
|
|
// The zero value and the special "give me all members"
|
|
// key are ok, otherwise the argument doesn't apply to
|
|
// the WAN.
|
|
default:
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprint(resp, "Cannot provide a segment with wan=true")
|
|
return nil, nil
|
|
}
|
|
}
|
|
|
|
var members []serf.Member
|
|
if wan {
|
|
members = s.agent.WANMembers()
|
|
} else {
|
|
var err error
|
|
if segment == api.AllSegments {
|
|
members, err = s.agent.delegate.LANMembersAllSegments()
|
|
} else {
|
|
members, err = s.agent.delegate.LANSegmentMembers(segment)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if err := s.agent.filterMembers(token, &members); err != nil {
|
|
return nil, err
|
|
}
|
|
return members, nil
|
|
}
|
|
|
|
func (s *HTTPServer) AgentJoin(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
// Fetch the ACL token, if any, and enforce agent policy.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
rule, err := s.agent.resolveToken(token)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if rule != nil && rule.AgentWrite(s.agent.config.NodeName, nil) != acl.Allow {
|
|
return nil, acl.ErrPermissionDenied
|
|
}
|
|
|
|
// Check if the WAN is being queried
|
|
wan := false
|
|
if other := req.URL.Query().Get("wan"); other != "" {
|
|
wan = true
|
|
}
|
|
|
|
// Get the address
|
|
addr := strings.TrimPrefix(req.URL.Path, "/v1/agent/join/")
|
|
if wan {
|
|
_, err = s.agent.JoinWAN([]string{addr})
|
|
} else {
|
|
_, err = s.agent.JoinLAN([]string{addr})
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
func (s *HTTPServer) AgentLeave(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
// Fetch the ACL token, if any, and enforce agent policy.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
rule, err := s.agent.resolveToken(token)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if rule != nil && rule.AgentWrite(s.agent.config.NodeName, nil) != acl.Allow {
|
|
return nil, acl.ErrPermissionDenied
|
|
}
|
|
|
|
if err := s.agent.Leave(); err != nil {
|
|
return nil, err
|
|
}
|
|
return nil, s.agent.ShutdownAgent()
|
|
}
|
|
|
|
func (s *HTTPServer) AgentForceLeave(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
// Fetch the ACL token, if any, and enforce agent policy.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
rule, err := s.agent.resolveToken(token)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if rule != nil && rule.AgentWrite(s.agent.config.NodeName, nil) != acl.Allow {
|
|
return nil, acl.ErrPermissionDenied
|
|
}
|
|
|
|
//Check the value of the prune query
|
|
_, prune := req.URL.Query()["prune"]
|
|
|
|
addr := strings.TrimPrefix(req.URL.Path, "/v1/agent/force-leave/")
|
|
return nil, s.agent.ForceLeave(addr, prune)
|
|
}
|
|
|
|
// syncChanges is a helper function which wraps a blocking call to sync
|
|
// services and checks to the server. If the operation fails, we only
|
|
// only warn because the write did succeed and anti-entropy will sync later.
|
|
func (s *HTTPServer) syncChanges() {
|
|
if err := s.agent.State.SyncChanges(); err != nil {
|
|
s.agent.logger.Printf("[ERR] agent: failed to sync changes: %v", err)
|
|
}
|
|
}
|
|
|
|
func (s *HTTPServer) AgentRegisterCheck(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var args structs.CheckDefinition
|
|
// Fixup the type decode of TTL or Interval.
|
|
decodeCB := func(raw interface{}) error {
|
|
return FixupCheckType(raw)
|
|
}
|
|
if err := decodeBody(req, &args, decodeCB); err != nil {
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprintf(resp, "Request decode failed: %v", err)
|
|
return nil, nil
|
|
}
|
|
|
|
// Verify the check has a name.
|
|
if args.Name == "" {
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprint(resp, "Missing check name")
|
|
return nil, nil
|
|
}
|
|
|
|
if args.Status != "" && !structs.ValidStatus(args.Status) {
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprint(resp, "Bad check status")
|
|
return nil, nil
|
|
}
|
|
|
|
// Construct the health check.
|
|
health := args.HealthCheck(s.agent.config.NodeName)
|
|
|
|
// Verify the check type.
|
|
chkType := args.CheckType()
|
|
err := chkType.Validate()
|
|
if err != nil {
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprint(resp, fmt.Errorf("Invalid check: %v", err))
|
|
return nil, nil
|
|
}
|
|
|
|
if health.ServiceID != "" {
|
|
// fixup the service name so that vetCheckRegister requires the right ACLs
|
|
service := s.agent.State.Service(health.ServiceID)
|
|
if service != nil {
|
|
health.ServiceName = service.Service
|
|
}
|
|
}
|
|
|
|
// Get the provided token, if any, and vet against any ACL policies.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
if err := s.agent.vetCheckRegister(token, health); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Add the check.
|
|
if err := s.agent.AddCheck(health, chkType, true, token, ConfigSourceRemote); err != nil {
|
|
return nil, err
|
|
}
|
|
s.syncChanges()
|
|
return nil, nil
|
|
}
|
|
|
|
func (s *HTTPServer) AgentDeregisterCheck(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/deregister/"))
|
|
|
|
// Get the provided token, if any, and vet against any ACL policies.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
if err := s.agent.vetCheckUpdate(token, checkID); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := s.agent.RemoveCheck(checkID, true); err != nil {
|
|
return nil, err
|
|
}
|
|
s.syncChanges()
|
|
return nil, nil
|
|
}
|
|
|
|
func (s *HTTPServer) AgentCheckPass(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/pass/"))
|
|
note := req.URL.Query().Get("note")
|
|
|
|
// Get the provided token, if any, and vet against any ACL policies.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
if err := s.agent.vetCheckUpdate(token, checkID); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := s.agent.updateTTLCheck(checkID, api.HealthPassing, note); err != nil {
|
|
return nil, err
|
|
}
|
|
s.syncChanges()
|
|
return nil, nil
|
|
}
|
|
|
|
func (s *HTTPServer) AgentCheckWarn(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/warn/"))
|
|
note := req.URL.Query().Get("note")
|
|
|
|
// Get the provided token, if any, and vet against any ACL policies.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
if err := s.agent.vetCheckUpdate(token, checkID); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := s.agent.updateTTLCheck(checkID, api.HealthWarning, note); err != nil {
|
|
return nil, err
|
|
}
|
|
s.syncChanges()
|
|
return nil, nil
|
|
}
|
|
|
|
func (s *HTTPServer) AgentCheckFail(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/fail/"))
|
|
note := req.URL.Query().Get("note")
|
|
|
|
// Get the provided token, if any, and vet against any ACL policies.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
if err := s.agent.vetCheckUpdate(token, checkID); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := s.agent.updateTTLCheck(checkID, api.HealthCritical, note); err != nil {
|
|
return nil, err
|
|
}
|
|
s.syncChanges()
|
|
return nil, nil
|
|
}
|
|
|
|
// checkUpdate is the payload for a PUT to AgentCheckUpdate.
|
|
type checkUpdate struct {
|
|
// Status us one of the api.Health* states, "passing", "warning", or
|
|
// "critical".
|
|
Status string
|
|
|
|
// Output is the information to post to the UI for operators as the
|
|
// output of the process that decided to hit the TTL check. This is
|
|
// different from the note field that's associated with the check
|
|
// itself.
|
|
Output string
|
|
}
|
|
|
|
// AgentCheckUpdate is a PUT-based alternative to the GET-based Pass/Warn/Fail
|
|
// APIs.
|
|
func (s *HTTPServer) AgentCheckUpdate(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var update checkUpdate
|
|
if err := decodeBody(req, &update, nil); err != nil {
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprintf(resp, "Request decode failed: %v", err)
|
|
return nil, nil
|
|
}
|
|
|
|
switch update.Status {
|
|
case api.HealthPassing:
|
|
case api.HealthWarning:
|
|
case api.HealthCritical:
|
|
default:
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprintf(resp, "Invalid check status: '%s'", update.Status)
|
|
return nil, nil
|
|
}
|
|
|
|
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/update/"))
|
|
|
|
// Get the provided token, if any, and vet against any ACL policies.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
if err := s.agent.vetCheckUpdate(token, checkID); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := s.agent.updateTTLCheck(checkID, update.Status, update.Output); err != nil {
|
|
return nil, err
|
|
}
|
|
s.syncChanges()
|
|
return nil, nil
|
|
}
|
|
|
|
// agentHealthService Returns Health for a given service ID
|
|
func agentHealthService(serviceID string, s *HTTPServer) (int, string, api.HealthChecks) {
|
|
checks := s.agent.State.Checks()
|
|
serviceChecks := make(api.HealthChecks, 0)
|
|
for _, c := range checks {
|
|
if c.ServiceID == serviceID || c.ServiceID == "" {
|
|
// TODO: harmonize struct.HealthCheck and api.HealthCheck (or at least extract conversion function)
|
|
healthCheck := &api.HealthCheck{
|
|
Node: c.Node,
|
|
CheckID: string(c.CheckID),
|
|
Name: c.Name,
|
|
Status: c.Status,
|
|
Notes: c.Notes,
|
|
Output: c.Output,
|
|
ServiceID: c.ServiceID,
|
|
ServiceName: c.ServiceName,
|
|
ServiceTags: c.ServiceTags,
|
|
}
|
|
serviceChecks = append(serviceChecks, healthCheck)
|
|
}
|
|
}
|
|
status := serviceChecks.AggregatedStatus()
|
|
switch status {
|
|
case api.HealthWarning:
|
|
return http.StatusTooManyRequests, status, serviceChecks
|
|
case api.HealthPassing:
|
|
return http.StatusOK, status, serviceChecks
|
|
default:
|
|
return http.StatusServiceUnavailable, status, serviceChecks
|
|
}
|
|
}
|
|
|
|
func returnTextPlain(req *http.Request) bool {
|
|
if contentType := req.Header.Get("Accept"); strings.HasPrefix(contentType, "text/plain") {
|
|
return true
|
|
}
|
|
if format := req.URL.Query().Get("format"); format != "" {
|
|
return format == "text"
|
|
}
|
|
return false
|
|
}
|
|
|
|
// AgentHealthServiceByID return the local Service Health given its ID
|
|
func (s *HTTPServer) AgentHealthServiceByID(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
// Pull out the service id (service id since there may be several instance of the same service on this host)
|
|
serviceID := strings.TrimPrefix(req.URL.Path, "/v1/agent/health/service/id/")
|
|
if serviceID == "" {
|
|
return nil, &BadRequestError{Reason: "Missing serviceID"}
|
|
}
|
|
services := s.agent.State.Services()
|
|
for _, service := range services {
|
|
if service.ID == serviceID {
|
|
code, status, healthChecks := agentHealthService(serviceID, s)
|
|
if returnTextPlain(req) {
|
|
return status, CodeWithPayloadError{StatusCode: code, Reason: status, ContentType: "text/plain"}
|
|
}
|
|
serviceInfo := buildAgentService(service)
|
|
result := &api.AgentServiceChecksInfo{
|
|
AggregatedStatus: status,
|
|
Checks: healthChecks,
|
|
Service: &serviceInfo,
|
|
}
|
|
return result, CodeWithPayloadError{StatusCode: code, Reason: status, ContentType: "application/json"}
|
|
}
|
|
}
|
|
notFoundReason := fmt.Sprintf("ServiceId %s not found", serviceID)
|
|
if returnTextPlain(req) {
|
|
return notFoundReason, CodeWithPayloadError{StatusCode: http.StatusNotFound, Reason: fmt.Sprintf("ServiceId %s not found", serviceID), ContentType: "application/json"}
|
|
}
|
|
return &api.AgentServiceChecksInfo{
|
|
AggregatedStatus: api.HealthCritical,
|
|
Checks: nil,
|
|
Service: nil,
|
|
}, CodeWithPayloadError{StatusCode: http.StatusNotFound, Reason: notFoundReason, ContentType: "application/json"}
|
|
}
|
|
|
|
// AgentHealthServiceByName return the worse status of all the services with given name on an agent
|
|
func (s *HTTPServer) AgentHealthServiceByName(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
// Pull out the service name
|
|
serviceName := strings.TrimPrefix(req.URL.Path, "/v1/agent/health/service/name/")
|
|
if serviceName == "" {
|
|
return nil, &BadRequestError{Reason: "Missing service Name"}
|
|
}
|
|
code := http.StatusNotFound
|
|
status := fmt.Sprintf("ServiceName %s Not Found", serviceName)
|
|
services := s.agent.State.Services()
|
|
result := make([]api.AgentServiceChecksInfo, 0, 16)
|
|
for _, service := range services {
|
|
if service.Service == serviceName {
|
|
scode, sstatus, healthChecks := agentHealthService(service.ID, s)
|
|
serviceInfo := buildAgentService(service)
|
|
res := api.AgentServiceChecksInfo{
|
|
AggregatedStatus: sstatus,
|
|
Checks: healthChecks,
|
|
Service: &serviceInfo,
|
|
}
|
|
result = append(result, res)
|
|
// When service is not found, we ignore it and keep existing HTTP status
|
|
if code == http.StatusNotFound {
|
|
code = scode
|
|
status = sstatus
|
|
}
|
|
// We take the worst of all statuses, so we keep iterating
|
|
// passing: 200 < warning: 429 < critical: 503
|
|
if code < scode {
|
|
code = scode
|
|
status = sstatus
|
|
}
|
|
}
|
|
}
|
|
if returnTextPlain(req) {
|
|
return status, CodeWithPayloadError{StatusCode: code, Reason: status, ContentType: "text/plain"}
|
|
}
|
|
return result, CodeWithPayloadError{StatusCode: code, Reason: status, ContentType: "application/json"}
|
|
}
|
|
|
|
func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var args structs.ServiceDefinition
|
|
// Fixup the type decode of TTL or Interval if a check if provided.
|
|
decodeCB := func(raw interface{}) error {
|
|
rawMap, ok := raw.(map[string]interface{})
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
// see https://github.com/hashicorp/consul/pull/3557 why we need this
|
|
// and why we should get rid of it.
|
|
lib.TranslateKeys(rawMap, map[string]string{
|
|
"enable_tag_override": "EnableTagOverride",
|
|
// Proxy Upstreams
|
|
"destination_name": "DestinationName",
|
|
"destination_type": "DestinationType",
|
|
"destination_namespace": "DestinationNamespace",
|
|
"local_bind_port": "LocalBindPort",
|
|
"local_bind_address": "LocalBindAddress",
|
|
// Proxy Config
|
|
"destination_service_name": "DestinationServiceName",
|
|
"destination_service_id": "DestinationServiceID",
|
|
"local_service_port": "LocalServicePort",
|
|
"local_service_address": "LocalServiceAddress",
|
|
// SidecarService
|
|
"sidecar_service": "SidecarService",
|
|
// Expose Config
|
|
"local_path_port": "LocalPathPort",
|
|
"listener_port": "ListenerPort",
|
|
|
|
// DON'T Recurse into these opaque config maps or we might mangle user's
|
|
// keys. Note empty canonical is a special sentinel to prevent recursion.
|
|
"Meta": "",
|
|
|
|
"tagged_addresses": "TaggedAddresses",
|
|
|
|
// upstreams is an array but this prevents recursion into config field of
|
|
// any item in the array.
|
|
"Proxy.Config": "",
|
|
"Proxy.Upstreams.Config": "",
|
|
"Connect.Proxy.Config": "",
|
|
"Connect.Proxy.Upstreams.Config": "",
|
|
|
|
// Same exceptions as above, but for a nested sidecar_service note we use
|
|
// the canonical form SidecarService since that is translated by the time
|
|
// the lookup here happens.
|
|
"Connect.SidecarService.Meta": "",
|
|
"Connect.SidecarService.Proxy.Config": "",
|
|
"Connect.SidecarService.Proxy.Upstreams.config": "",
|
|
})
|
|
|
|
for k, v := range rawMap {
|
|
switch strings.ToLower(k) {
|
|
case "check":
|
|
if err := FixupCheckType(v); err != nil {
|
|
return err
|
|
}
|
|
case "checks":
|
|
chkTypes, ok := v.([]interface{})
|
|
if !ok {
|
|
continue
|
|
}
|
|
for _, chkType := range chkTypes {
|
|
if err := FixupCheckType(chkType); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
if err := decodeBody(req, &args, decodeCB); err != nil {
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprintf(resp, "Request decode failed: %v", err)
|
|
return nil, nil
|
|
}
|
|
|
|
// Verify the service has a name.
|
|
if args.Name == "" {
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprint(resp, "Missing service name")
|
|
return nil, nil
|
|
}
|
|
|
|
// Check the service address here and in the catalog RPC endpoint
|
|
// since service registration isn't synchronous.
|
|
if ipaddr.IsAny(args.Address) {
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprintf(resp, "Invalid service address")
|
|
return nil, nil
|
|
}
|
|
|
|
// Get the node service.
|
|
ns := args.NodeService()
|
|
if ns.Weights != nil {
|
|
if err := structs.ValidateWeights(ns.Weights); err != nil {
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprint(resp, fmt.Errorf("Invalid Weights: %v", err))
|
|
return nil, nil
|
|
}
|
|
}
|
|
if err := structs.ValidateMetadata(ns.Meta, false); err != nil {
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprint(resp, fmt.Errorf("Invalid Service Meta: %v", err))
|
|
return nil, nil
|
|
}
|
|
|
|
// Run validation. This is the same validation that would happen on
|
|
// the catalog endpoint so it helps ensure the sync will work properly.
|
|
if err := ns.Validate(); err != nil {
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprintf(resp, err.Error())
|
|
return nil, nil
|
|
}
|
|
|
|
// Verify the check type.
|
|
chkTypes, err := args.CheckTypes()
|
|
if err != nil {
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprint(resp, fmt.Errorf("Invalid check: %v", err))
|
|
return nil, nil
|
|
}
|
|
for _, check := range chkTypes {
|
|
if check.Status != "" && !structs.ValidStatus(check.Status) {
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprint(resp, "Status for checks must 'passing', 'warning', 'critical'")
|
|
return nil, nil
|
|
}
|
|
}
|
|
|
|
// Verify the sidecar check types
|
|
if args.Connect != nil && args.Connect.SidecarService != nil {
|
|
chkTypes, err := args.Connect.SidecarService.CheckTypes()
|
|
if err != nil {
|
|
return nil, &BadRequestError{
|
|
Reason: fmt.Sprintf("Invalid check in sidecar_service: %v", err),
|
|
}
|
|
}
|
|
for _, check := range chkTypes {
|
|
if check.Status != "" && !structs.ValidStatus(check.Status) {
|
|
return nil, &BadRequestError{
|
|
Reason: "Status for checks must 'passing', 'warning', 'critical'",
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Get the provided token, if any, and vet against any ACL policies.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
if err := s.agent.vetServiceRegister(token, ns); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// See if we have a sidecar to register too
|
|
sidecar, sidecarChecks, sidecarToken, err := s.agent.sidecarServiceFromNodeService(ns, token)
|
|
if err != nil {
|
|
return nil, &BadRequestError{
|
|
Reason: fmt.Sprintf("Invalid SidecarService: %s", err)}
|
|
}
|
|
if sidecar != nil {
|
|
// Make sure we are allowed to register the sidecar using the token
|
|
// specified (might be specific to sidecar or the same one as the overall
|
|
// request).
|
|
if err := s.agent.vetServiceRegister(sidecarToken, sidecar); err != nil {
|
|
return nil, err
|
|
}
|
|
// We parsed the sidecar registration, now remove it from the NodeService
|
|
// for the actual service since it's done it's job and we don't want to
|
|
// persist it in the actual state/catalog. SidecarService is meant to be a
|
|
// registration syntax sugar so don't propagate it any further.
|
|
ns.Connect.SidecarService = nil
|
|
}
|
|
|
|
// Add the service.
|
|
replaceExistingChecks := false
|
|
|
|
query := req.URL.Query()
|
|
if len(query["replace-existing-checks"]) > 0 && (query.Get("replace-existing-checks") == "" || query.Get("replace-existing-checks") == "true") {
|
|
replaceExistingChecks = true
|
|
}
|
|
|
|
if replaceExistingChecks {
|
|
if err := s.agent.AddServiceAndReplaceChecks(ns, chkTypes, true, token, ConfigSourceRemote); err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
if err := s.agent.AddService(ns, chkTypes, true, token, ConfigSourceRemote); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
// Add sidecar.
|
|
if sidecar != nil {
|
|
if replaceExistingChecks {
|
|
if err := s.agent.AddServiceAndReplaceChecks(sidecar, sidecarChecks, true, sidecarToken, ConfigSourceRemote); err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
if err := s.agent.AddService(sidecar, sidecarChecks, true, sidecarToken, ConfigSourceRemote); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
s.syncChanges()
|
|
return nil, nil
|
|
}
|
|
|
|
func (s *HTTPServer) AgentDeregisterService(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
serviceID := strings.TrimPrefix(req.URL.Path, "/v1/agent/service/deregister/")
|
|
|
|
// Get the provided token, if any, and vet against any ACL policies.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
if err := s.agent.vetServiceUpdate(token, serviceID); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := s.agent.RemoveService(serviceID); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s.syncChanges()
|
|
return nil, nil
|
|
}
|
|
|
|
func (s *HTTPServer) AgentServiceMaintenance(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
// Ensure we have a service ID
|
|
serviceID := strings.TrimPrefix(req.URL.Path, "/v1/agent/service/maintenance/")
|
|
if serviceID == "" {
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprint(resp, "Missing service ID")
|
|
return nil, nil
|
|
}
|
|
|
|
// Ensure we have some action
|
|
params := req.URL.Query()
|
|
if _, ok := params["enable"]; !ok {
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprint(resp, "Missing value for enable")
|
|
return nil, nil
|
|
}
|
|
|
|
raw := params.Get("enable")
|
|
enable, err := strconv.ParseBool(raw)
|
|
if err != nil {
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprintf(resp, "Invalid value for enable: %q", raw)
|
|
return nil, nil
|
|
}
|
|
|
|
// Get the provided token, if any, and vet against any ACL policies.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
if err := s.agent.vetServiceUpdate(token, serviceID); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if enable {
|
|
reason := params.Get("reason")
|
|
if err = s.agent.EnableServiceMaintenance(serviceID, reason, token); err != nil {
|
|
resp.WriteHeader(http.StatusNotFound)
|
|
fmt.Fprint(resp, err.Error())
|
|
return nil, nil
|
|
}
|
|
} else {
|
|
if err = s.agent.DisableServiceMaintenance(serviceID); err != nil {
|
|
resp.WriteHeader(http.StatusNotFound)
|
|
fmt.Fprint(resp, err.Error())
|
|
return nil, nil
|
|
}
|
|
}
|
|
s.syncChanges()
|
|
return nil, nil
|
|
}
|
|
|
|
func (s *HTTPServer) AgentNodeMaintenance(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
// Ensure we have some action
|
|
params := req.URL.Query()
|
|
if _, ok := params["enable"]; !ok {
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprint(resp, "Missing value for enable")
|
|
return nil, nil
|
|
}
|
|
|
|
raw := params.Get("enable")
|
|
enable, err := strconv.ParseBool(raw)
|
|
if err != nil {
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprintf(resp, "Invalid value for enable: %q", raw)
|
|
return nil, nil
|
|
}
|
|
|
|
// Get the provided token, if any, and vet against any ACL policies.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
rule, err := s.agent.resolveToken(token)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// TODO (namespaces) - pass through a real ent authz ctx?
|
|
if rule != nil && rule.NodeWrite(s.agent.config.NodeName, nil) != acl.Allow {
|
|
return nil, acl.ErrPermissionDenied
|
|
}
|
|
|
|
if enable {
|
|
s.agent.EnableNodeMaintenance(params.Get("reason"), token)
|
|
} else {
|
|
s.agent.DisableNodeMaintenance()
|
|
}
|
|
s.syncChanges()
|
|
return nil, nil
|
|
}
|
|
|
|
func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
// Fetch the ACL token, if any, and enforce agent policy.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
rule, err := s.agent.resolveToken(token)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if rule != nil && rule.AgentRead(s.agent.config.NodeName, nil) != acl.Allow {
|
|
return nil, acl.ErrPermissionDenied
|
|
}
|
|
|
|
// Get the provided loglevel.
|
|
logLevel := req.URL.Query().Get("loglevel")
|
|
if logLevel == "" {
|
|
logLevel = "INFO"
|
|
}
|
|
|
|
// Upper case the level since that's required by the filter.
|
|
logLevel = strings.ToUpper(logLevel)
|
|
|
|
// Create a level filter and flusher.
|
|
filter := logger.LevelFilter()
|
|
filter.MinLevel = logutils.LogLevel(logLevel)
|
|
if !logger.ValidateLevelFilter(filter.MinLevel, filter) {
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprintf(resp, "Unknown log level: %s", filter.MinLevel)
|
|
return nil, nil
|
|
}
|
|
flusher, ok := resp.(http.Flusher)
|
|
if !ok {
|
|
return nil, fmt.Errorf("Streaming not supported")
|
|
}
|
|
|
|
// Set up a log handler.
|
|
handler := &httpLogHandler{
|
|
filter: filter,
|
|
logCh: make(chan string, 512),
|
|
logger: s.agent.logger,
|
|
}
|
|
s.agent.LogWriter.RegisterHandler(handler)
|
|
defer s.agent.LogWriter.DeregisterHandler(handler)
|
|
notify := resp.(http.CloseNotifier).CloseNotify()
|
|
|
|
// Send header so client can start streaming body
|
|
resp.WriteHeader(http.StatusOK)
|
|
|
|
// 0 byte write is needed before the Flush call so that if we are using
|
|
// a gzip stream it will go ahead and write out the HTTP response header
|
|
resp.Write([]byte(""))
|
|
flusher.Flush()
|
|
|
|
// Stream logs until the connection is closed.
|
|
for {
|
|
select {
|
|
case <-notify:
|
|
s.agent.LogWriter.DeregisterHandler(handler)
|
|
if handler.droppedCount > 0 {
|
|
s.agent.logger.Printf("[WARN] agent: Dropped %d logs during monitor request", handler.droppedCount)
|
|
}
|
|
return nil, nil
|
|
case log := <-handler.logCh:
|
|
fmt.Fprintln(resp, log)
|
|
flusher.Flush()
|
|
}
|
|
}
|
|
}
|
|
|
|
type httpLogHandler struct {
|
|
filter *logutils.LevelFilter
|
|
logCh chan string
|
|
logger *log.Logger
|
|
droppedCount int
|
|
}
|
|
|
|
func (h *httpLogHandler) HandleLog(log string) {
|
|
// Check the log level
|
|
if !h.filter.Check([]byte(log)) {
|
|
return
|
|
}
|
|
|
|
// Do a non-blocking send
|
|
select {
|
|
case h.logCh <- log:
|
|
default:
|
|
// Just increment a counter for dropped logs to this handler; we can't log now
|
|
// because the lock is already held by the LogWriter invoking this
|
|
h.droppedCount++
|
|
}
|
|
}
|
|
|
|
func (s *HTTPServer) AgentToken(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
if s.checkACLDisabled(resp, req) {
|
|
return nil, nil
|
|
}
|
|
|
|
// Fetch the ACL token, if any, and enforce agent policy.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
rule, err := s.agent.resolveToken(token)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if rule != nil && rule.AgentWrite(s.agent.config.NodeName, nil) != acl.Allow {
|
|
return nil, acl.ErrPermissionDenied
|
|
}
|
|
|
|
// The body is just the token, but it's in a JSON object so we can add
|
|
// fields to this later if needed.
|
|
var args api.AgentToken
|
|
if err := decodeBody(req, &args, nil); err != nil {
|
|
resp.WriteHeader(http.StatusBadRequest)
|
|
fmt.Fprintf(resp, "Request decode failed: %v", err)
|
|
return nil, nil
|
|
}
|
|
|
|
if s.agent.config.ACLEnableTokenPersistence {
|
|
// we hold the lock around updating the internal token store
|
|
// as well as persisting the tokens because we don't want to write
|
|
// into the store to have something else wipe it out before we can
|
|
// persist everything (like an agent config reload). The token store
|
|
// lock is only held for those operations so other go routines that
|
|
// just need to read some token out of the store will not be impacted
|
|
// any more than they would be without token persistence.
|
|
s.agent.persistedTokensLock.Lock()
|
|
defer s.agent.persistedTokensLock.Unlock()
|
|
}
|
|
|
|
// Figure out the target token.
|
|
target := strings.TrimPrefix(req.URL.Path, "/v1/agent/token/")
|
|
triggerAntiEntropySync := false
|
|
switch target {
|
|
case "acl_token", "default":
|
|
changed := s.agent.tokens.UpdateUserToken(args.Token, token_store.TokenSourceAPI)
|
|
if changed {
|
|
triggerAntiEntropySync = true
|
|
}
|
|
|
|
case "acl_agent_token", "agent":
|
|
changed := s.agent.tokens.UpdateAgentToken(args.Token, token_store.TokenSourceAPI)
|
|
if changed {
|
|
triggerAntiEntropySync = true
|
|
}
|
|
|
|
case "acl_agent_master_token", "agent_master":
|
|
s.agent.tokens.UpdateAgentMasterToken(args.Token, token_store.TokenSourceAPI)
|
|
|
|
case "acl_replication_token", "replication":
|
|
s.agent.tokens.UpdateReplicationToken(args.Token, token_store.TokenSourceAPI)
|
|
|
|
default:
|
|
resp.WriteHeader(http.StatusNotFound)
|
|
fmt.Fprintf(resp, "Token %q is unknown", target)
|
|
return nil, nil
|
|
}
|
|
|
|
if triggerAntiEntropySync {
|
|
s.agent.sync.SyncFull.Trigger()
|
|
}
|
|
|
|
if s.agent.config.ACLEnableTokenPersistence {
|
|
tokens := persistedTokens{}
|
|
|
|
if tok, source := s.agent.tokens.UserTokenAndSource(); tok != "" && source == token_store.TokenSourceAPI {
|
|
tokens.Default = tok
|
|
}
|
|
|
|
if tok, source := s.agent.tokens.AgentTokenAndSource(); tok != "" && source == token_store.TokenSourceAPI {
|
|
tokens.Agent = tok
|
|
}
|
|
|
|
if tok, source := s.agent.tokens.AgentMasterTokenAndSource(); tok != "" && source == token_store.TokenSourceAPI {
|
|
tokens.AgentMaster = tok
|
|
}
|
|
|
|
if tok, source := s.agent.tokens.ReplicationTokenAndSource(); tok != "" && source == token_store.TokenSourceAPI {
|
|
tokens.Replication = tok
|
|
}
|
|
|
|
data, err := json.Marshal(tokens)
|
|
if err != nil {
|
|
s.agent.logger.Printf("[WARN] agent: failed to persist tokens - %v", err)
|
|
return nil, fmt.Errorf("Failed to marshal tokens for persistence: %v", err)
|
|
}
|
|
|
|
if err := file.WriteAtomicWithPerms(filepath.Join(s.agent.config.DataDir, tokensPath), data, 0600); err != nil {
|
|
s.agent.logger.Printf("[WARN] agent: failed to persist tokens - %v", err)
|
|
return nil, fmt.Errorf("Failed to persist tokens - %v", err)
|
|
}
|
|
}
|
|
|
|
s.agent.logger.Printf("[INFO] agent: Updated agent's ACL token %q", target)
|
|
return nil, nil
|
|
}
|
|
|
|
// AgentConnectCARoots returns the trusted CA roots.
|
|
func (s *HTTPServer) AgentConnectCARoots(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
var args structs.DCSpecificRequest
|
|
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
|
return nil, nil
|
|
}
|
|
|
|
raw, m, err := s.agent.cache.Get(cachetype.ConnectCARootName, &args)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer setCacheMeta(resp, &m)
|
|
|
|
// Add cache hit
|
|
|
|
reply, ok := raw.(*structs.IndexedCARoots)
|
|
if !ok {
|
|
// This should never happen, but we want to protect against panics
|
|
return nil, fmt.Errorf("internal error: response type not correct")
|
|
}
|
|
defer setMeta(resp, &reply.QueryMeta)
|
|
|
|
return *reply, nil
|
|
}
|
|
|
|
// AgentConnectCALeafCert returns the certificate bundle for a service
|
|
// instance. This supports blocking queries to update the returned bundle.
|
|
func (s *HTTPServer) AgentConnectCALeafCert(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
// Get the service name. Note that this is the name of the service,
|
|
// not the ID of the service instance.
|
|
serviceName := strings.TrimPrefix(req.URL.Path, "/v1/agent/connect/ca/leaf/")
|
|
|
|
args := cachetype.ConnectCALeafRequest{
|
|
Service: serviceName, // Need name not ID
|
|
}
|
|
var qOpts structs.QueryOptions
|
|
|
|
// Store DC in the ConnectCALeafRequest but query opts separately
|
|
if done := s.parse(resp, req, &args.Datacenter, &qOpts); done {
|
|
return nil, nil
|
|
}
|
|
args.MinQueryIndex = qOpts.MinQueryIndex
|
|
args.MaxQueryTime = qOpts.MaxQueryTime
|
|
args.Token = qOpts.Token
|
|
|
|
raw, m, err := s.agent.cache.Get(cachetype.ConnectCALeafName, &args)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer setCacheMeta(resp, &m)
|
|
|
|
reply, ok := raw.(*structs.IssuedCert)
|
|
if !ok {
|
|
// This should never happen, but we want to protect against panics
|
|
return nil, fmt.Errorf("internal error: response type not correct")
|
|
}
|
|
setIndex(resp, reply.ModifyIndex)
|
|
|
|
return reply, nil
|
|
}
|
|
|
|
// AgentConnectAuthorize
|
|
//
|
|
// POST /v1/agent/connect/authorize
|
|
//
|
|
// Note: when this logic changes, consider if the Intention.Check RPC method
|
|
// also needs to be updated.
|
|
func (s *HTTPServer) AgentConnectAuthorize(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
// Fetch the token
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
|
|
// Decode the request from the request body
|
|
var authReq structs.ConnectAuthorizeRequest
|
|
if err := decodeBody(req, &authReq, nil); err != nil {
|
|
return nil, BadRequestError{fmt.Sprintf("Request decode failed: %v", err)}
|
|
}
|
|
|
|
authz, reason, cacheMeta, err := s.agent.ConnectAuthorize(token, &authReq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
setCacheMeta(resp, cacheMeta)
|
|
|
|
return &connectAuthorizeResp{
|
|
Authorized: authz,
|
|
Reason: reason,
|
|
}, nil
|
|
}
|
|
|
|
// connectAuthorizeResp is the response format/structure for the
|
|
// /v1/agent/connect/authorize endpoint.
|
|
type connectAuthorizeResp struct {
|
|
Authorized bool // True if authorized, false if not
|
|
Reason string // Reason for the Authorized value (whether true or false)
|
|
}
|
|
|
|
// AgentHost
|
|
//
|
|
// GET /v1/agent/host
|
|
//
|
|
// Retrieves information about resources available and in-use for the
|
|
// host the agent is running on such as CPU, memory, and disk usage. Requires
|
|
// a operator:read ACL token.
|
|
func (s *HTTPServer) AgentHost(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
// Fetch the ACL token, if any, and enforce agent policy.
|
|
var token string
|
|
s.parseToken(req, &token)
|
|
rule, err := s.agent.resolveToken(token)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// TODO (namespaces) - pass through a real ent authz ctx
|
|
if rule != nil && rule.OperatorRead(nil) != acl.Allow {
|
|
return nil, acl.ErrPermissionDenied
|
|
}
|
|
|
|
return debug.CollectHostInfo(), nil
|
|
}
|