VAULT-1564 report in-flight requests (#13024)

* VAULT-1564 report in-flight requests

* adding a changelog

* Changing some variable names and fixing comments

* minor style change

* adding unauthenticated support for in-flight-req

* adding documentation for the listener.profiling stanza

* adding an atomic counter for the inflight requests
addressing comments

* addressing comments

* logging completed requests

* fixing a test

* providing log_requests_info as a config option to determine at which level requests should be logged

* removing a member and a method from the StatusHeaderResponseWriter struct

* adding api docks

* revert changes in NewHTTPResponseWriter

* Fix logging invalid log_requests_info value

* Addressing comments

* Fixing a test

* use an tomic value for logRequestsInfo, and moving the CreateClientID function to Core

* fixing go.sum

* minor refactoring

* protecting InFlightRequests from data race

* another try on fixing a data race

* another try to fix a data race

* addressing comments

* fixing couple of tests

* changing log_requests_info to log_requests_level

* minor style change

* fixing a test

* removing the lock in InFlightRequests

* use single-argument form for interface assertion

* adding doc for the new configuration paramter

* adding the new doc to the nav data file

* minor fix
This commit is contained in:
hghaf099 2021-12-08 17:34:42 -05:00 committed by GitHub
parent c97c8687f4
commit 65845c7531
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 714 additions and 151 deletions

3
changelog/13024.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:feature
**Report in-flight requests**:Adding a trace capability to show in-flight requests, and a new gauge metric to show the total number of in-flight requests
```

View File

@ -17,6 +17,7 @@ import (
"github.com/hashicorp/go-secure-stdlib/gatedwriter" "github.com/hashicorp/go-secure-stdlib/gatedwriter"
"github.com/hashicorp/go-secure-stdlib/strutil" "github.com/hashicorp/go-secure-stdlib/strutil"
"github.com/hashicorp/vault/api" "github.com/hashicorp/vault/api"
"github.com/hashicorp/vault/sdk/helper/jsonutil"
"github.com/hashicorp/vault/sdk/helper/logging" "github.com/hashicorp/vault/sdk/helper/logging"
"github.com/hashicorp/vault/sdk/version" "github.com/hashicorp/vault/sdk/version"
"github.com/mholt/archiver" "github.com/mholt/archiver"
@ -106,6 +107,7 @@ type DebugCommand struct {
metricsCollection []map[string]interface{} metricsCollection []map[string]interface{}
replicationStatusCollection []map[string]interface{} replicationStatusCollection []map[string]interface{}
serverStatusCollection []map[string]interface{} serverStatusCollection []map[string]interface{}
inFlightReqStatusCollection []map[string]interface{}
// cachedClient holds the client retrieved during preflight // cachedClient holds the client retrieved during preflight
cachedClient *api.Client cachedClient *api.Client
@ -480,7 +482,7 @@ func (c *DebugCommand) preflight(rawArgs []string) (string, error) {
} }
func (c *DebugCommand) defaultTargets() []string { func (c *DebugCommand) defaultTargets() []string {
return []string{"config", "host", "metrics", "pprof", "replication-status", "server-status", "log"} return []string{"config", "host", "requests", "metrics", "pprof", "replication-status", "server-status", "log"}
} }
func (c *DebugCommand) captureStaticTargets() error { func (c *DebugCommand) captureStaticTargets() error {
@ -492,6 +494,7 @@ func (c *DebugCommand) captureStaticTargets() error {
if err != nil { if err != nil {
c.captureError("config", err) c.captureError("config", err)
c.logger.Error("config: error capturing config state", "error", err) c.logger.Error("config: error capturing config state", "error", err)
return nil
} }
if resp != nil && resp.Data != nil { if resp != nil && resp.Data != nil {
@ -580,6 +583,16 @@ func (c *DebugCommand) capturePollingTargets() error {
}) })
} }
// Collect in-flight request status if target is specified
if strutil.StrListContains(c.flagTargets, "requests") {
g.Add(func() error {
c.collectInFlightRequestStatus(ctx)
return nil
}, func(error) {
cancelFunc()
})
}
if strutil.StrListContains(c.flagTargets, "log") { if strutil.StrListContains(c.flagTargets, "log") {
g.Add(func() error { g.Add(func() error {
c.writeLogs(ctx) c.writeLogs(ctx)
@ -611,7 +624,9 @@ func (c *DebugCommand) capturePollingTargets() error {
if err := c.persistCollection(c.hostInfoCollection, "host_info.json"); err != nil { if err := c.persistCollection(c.hostInfoCollection, "host_info.json"); err != nil {
c.UI.Error(fmt.Sprintf("Error writing data to %s: %v", "host_info.json", err)) c.UI.Error(fmt.Sprintf("Error writing data to %s: %v", "host_info.json", err))
} }
if err := c.persistCollection(c.inFlightReqStatusCollection, "requests.json"); err != nil {
c.UI.Error(fmt.Sprintf("Error writing data to %s: %v", "requests.json", err))
}
return nil return nil
} }
@ -635,6 +650,7 @@ func (c *DebugCommand) collectHostInfo(ctx context.Context) {
resp, err := c.cachedClient.RawRequestWithContext(ctx, r) resp, err := c.cachedClient.RawRequestWithContext(ctx, r)
if err != nil { if err != nil {
c.captureError("host", err) c.captureError("host", err)
return
} }
if resp != nil { if resp != nil {
defer resp.Body.Close() defer resp.Body.Close()
@ -642,6 +658,7 @@ func (c *DebugCommand) collectHostInfo(ctx context.Context) {
secret, err := api.ParseSecret(resp.Body) secret, err := api.ParseSecret(resp.Body)
if err != nil { if err != nil {
c.captureError("host", err) c.captureError("host", err)
return
} }
if secret != nil && secret.Data != nil { if secret != nil && secret.Data != nil {
hostEntry := secret.Data hostEntry := secret.Data
@ -829,6 +846,7 @@ func (c *DebugCommand) collectReplicationStatus(ctx context.Context) {
resp, err := c.cachedClient.RawRequestWithContext(ctx, r) resp, err := c.cachedClient.RawRequestWithContext(ctx, r)
if err != nil { if err != nil {
c.captureError("replication-status", err) c.captureError("replication-status", err)
return
} }
if resp != nil { if resp != nil {
defer resp.Body.Close() defer resp.Body.Close()
@ -836,6 +854,7 @@ func (c *DebugCommand) collectReplicationStatus(ctx context.Context) {
secret, err := api.ParseSecret(resp.Body) secret, err := api.ParseSecret(resp.Body)
if err != nil { if err != nil {
c.captureError("replication-status", err) c.captureError("replication-status", err)
return
} }
if secret != nil && secret.Data != nil { if secret != nil && secret.Data != nil {
replicationEntry := secret.Data replicationEntry := secret.Data
@ -880,6 +899,48 @@ func (c *DebugCommand) collectServerStatus(ctx context.Context) {
} }
} }
func (c *DebugCommand) collectInFlightRequestStatus(ctx context.Context) {
idxCount := 0
intervalTicker := time.Tick(c.flagInterval)
for {
if idxCount > 0 {
select {
case <-ctx.Done():
return
case <-intervalTicker:
}
}
c.logger.Info("capturing in-flight request status", "count", idxCount)
idxCount++
req := c.cachedClient.NewRequest("GET", "/v1/sys/in-flight-req")
resp, err := c.cachedClient.RawRequestWithContext(ctx, req)
if err != nil {
c.captureError("requests", err)
return
}
var data map[string]interface{}
if resp != nil {
defer resp.Body.Close()
err = jsonutil.DecodeJSONFromReader(resp.Body, &data)
if err != nil {
c.captureError("requests", err)
return
}
statusEntry := map[string]interface{}{
"timestamp": time.Now().UTC(),
"in_flight_requests": data,
}
c.inFlightReqStatusCollection = append(c.inFlightReqStatusCollection, statusEntry)
}
}
}
// persistCollection writes the collected data for a particular target onto the // persistCollection writes the collected data for a particular target onto the
// specified file. If the collection is empty, it returns immediately. // specified file. If the collection is empty, it returns immediately.
func (c *DebugCommand) persistCollection(collection []map[string]interface{}, outFile string) error { func (c *DebugCommand) persistCollection(collection []map[string]interface{}, outFile string) error {

View File

@ -235,6 +235,11 @@ func TestDebugCommand_CaptureTargets(t *testing.T) {
[]string{"server-status"}, []string{"server-status"},
[]string{"server_status.json"}, []string{"server_status.json"},
}, },
{
"in-flight-req",
[]string{"requests"},
[]string{"requests.json"},
},
{ {
"all-minus-pprof", "all-minus-pprof",
[]string{"config", "host", "metrics", "replication-status", "server-status"}, []string{"config", "host", "metrics", "replication-status", "server-status"},

View File

@ -1547,6 +1547,9 @@ func (c *ServerCommand) Run(args []string) int {
c.logger.Error(err.Error()) c.logger.Error(err.Error())
} }
// Setting log request with the new value in the config after reload
core.ReloadLogRequestsLevel()
if config.LogLevel != "" { if config.LogLevel != "" {
configLogLevel := strings.ToLower(strings.TrimSpace(config.LogLevel)) configLogLevel := strings.ToLower(strings.TrimSpace(config.LogLevel))
switch configLogLevel { switch configLogLevel {

View File

@ -82,6 +82,9 @@ type Config struct {
EnableResponseHeaderHostname bool `hcl:"-"` EnableResponseHeaderHostname bool `hcl:"-"`
EnableResponseHeaderHostnameRaw interface{} `hcl:"enable_response_header_hostname"` EnableResponseHeaderHostnameRaw interface{} `hcl:"enable_response_header_hostname"`
LogRequestsLevel string `hcl:"-"`
LogRequestsLevelRaw interface{} `hcl:"log_requests_level"`
EnableResponseHeaderRaftNodeID bool `hcl:"-"` EnableResponseHeaderRaftNodeID bool `hcl:"-"`
EnableResponseHeaderRaftNodeIDRaw interface{} `hcl:"enable_response_header_raft_node_id"` EnableResponseHeaderRaftNodeIDRaw interface{} `hcl:"enable_response_header_raft_node_id"`
@ -320,6 +323,11 @@ func (c *Config) Merge(c2 *Config) *Config {
result.EnableResponseHeaderHostname = c2.EnableResponseHeaderHostname result.EnableResponseHeaderHostname = c2.EnableResponseHeaderHostname
} }
result.LogRequestsLevel = c.LogRequestsLevel
if c2.LogRequestsLevel != "" {
result.LogRequestsLevel = c2.LogRequestsLevel
}
result.EnableResponseHeaderRaftNodeID = c.EnableResponseHeaderRaftNodeID result.EnableResponseHeaderRaftNodeID = c.EnableResponseHeaderRaftNodeID
if c2.EnableResponseHeaderRaftNodeID { if c2.EnableResponseHeaderRaftNodeID {
result.EnableResponseHeaderRaftNodeID = c2.EnableResponseHeaderRaftNodeID result.EnableResponseHeaderRaftNodeID = c2.EnableResponseHeaderRaftNodeID
@ -508,6 +516,11 @@ func ParseConfig(d, source string) (*Config, error) {
} }
} }
if result.LogRequestsLevelRaw != nil {
result.LogRequestsLevel = strings.ToLower(strings.TrimSpace(result.LogRequestsLevelRaw.(string)))
result.LogRequestsLevelRaw = ""
}
if result.EnableResponseHeaderRaftNodeIDRaw != nil { if result.EnableResponseHeaderRaftNodeIDRaw != nil {
if result.EnableResponseHeaderRaftNodeID, err = parseutil.ParseBool(result.EnableResponseHeaderRaftNodeIDRaw); err != nil { if result.EnableResponseHeaderRaftNodeID, err = parseutil.ParseBool(result.EnableResponseHeaderRaftNodeIDRaw); err != nil {
return nil, err return nil, err
@ -945,6 +958,8 @@ func (c *Config) Sanitized() map[string]interface{} {
"enable_response_header_hostname": c.EnableResponseHeaderHostname, "enable_response_header_hostname": c.EnableResponseHeaderHostname,
"enable_response_header_raft_node_id": c.EnableResponseHeaderRaftNodeID, "enable_response_header_raft_node_id": c.EnableResponseHeaderRaftNodeID,
"log_requests_level": c.LogRequestsLevel,
} }
for k, v := range sharedResult { for k, v := range sharedResult {
result[k] = v result[k] = v

View File

@ -701,6 +701,7 @@ func testConfig_Sanitized(t *testing.T) {
"enable_ui": true, "enable_ui": true,
"enable_response_header_hostname": false, "enable_response_header_hostname": false,
"enable_response_header_raft_node_id": false, "enable_response_header_raft_node_id": false,
"log_requests_level": "basic",
"ha_storage": map[string]interface{}{ "ha_storage": map[string]interface{}{
"cluster_addr": "top_level_cluster_addr", "cluster_addr": "top_level_cluster_addr",
"disable_clustering": true, "disable_clustering": true,

View File

@ -6,6 +6,7 @@ import (
"io" "io"
"net" "net"
"testing" "testing"
) )
type testListenerConnFn func(net.Listener) (net.Conn, error) type testListenerConnFn func(net.Listener) (net.Conn, error)
@ -66,3 +67,15 @@ func testListenerImpl(t *testing.T, ln net.Listener, connFn testListenerConnFn,
t.Fatalf("bad: %v", buf.String()) t.Fatalf("bad: %v", buf.String())
} }
} }
func TestProfilingUnauthenticatedInFlightAccess(t *testing.T) {
config, err := LoadConfigFile("./test-fixtures/unauth_in_flight_access.hcl")
if err != nil {
t.Fatalf("Error encountered when loading config %+v", err)
}
if !config.Listeners[0].InFlightRequestLogging.UnauthenticatedInFlightAccess {
t.Fatalf("failed to read UnauthenticatedInFlightAccess")
}
}

View File

@ -1,5 +1,6 @@
disable_cache = true disable_cache = true
disable_mlock = true disable_mlock = true
log_requests_level = "Basic"
ui = true ui = true

View File

@ -0,0 +1,9 @@
storage "inmem" {}
listener "tcp" {
address = "127.0.0.1:8200"
tls_disable = true
inflight_requests_logging {
unauthenticated_in_flight_requests_access = true
}
}
disable_mlock = true

View File

@ -25,6 +25,7 @@ import (
"github.com/hashicorp/go-cleanhttp" "github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-secure-stdlib/parseutil" "github.com/hashicorp/go-secure-stdlib/parseutil"
"github.com/hashicorp/go-sockaddr" "github.com/hashicorp/go-sockaddr"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/helper/namespace" "github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/internalshared/configutil" "github.com/hashicorp/vault/internalshared/configutil"
"github.com/hashicorp/vault/sdk/helper/consts" "github.com/hashicorp/vault/sdk/helper/consts"
@ -195,6 +196,11 @@ func Handler(props *vault.HandlerProperties) http.Handler {
mux.Handle("/v1/sys/pprof/", handleLogicalNoForward(core)) mux.Handle("/v1/sys/pprof/", handleLogicalNoForward(core))
} }
if props.ListenerConfig != nil && props.ListenerConfig.InFlightRequestLogging.UnauthenticatedInFlightAccess {
mux.Handle("/v1/sys/in-flight-req", handleUnAuthenticatedInFlightRequest(core))
} else {
mux.Handle("/v1/sys/in-flight-req", handleLogicalNoForward(core))
}
additionalRoutes(mux, core) additionalRoutes(mux, core)
} }
@ -314,8 +320,11 @@ func wrapGenericHandler(core *vault.Core, h http.Handler, props *vault.HandlerPr
customHeaders = listenerCustomHeaders.StatusCodeHeaderMap customHeaders = listenerCustomHeaders.StatusCodeHeaderMap
} }
} }
// saving start time for the in-flight requests
inFlightReqStartTime := time.Now()
nw := logical.NewStatusHeaderResponseWriter(w, customHeaders) nw := logical.NewStatusHeaderResponseWriter(w, customHeaders)
// Set the Cache-Control header for all the responses returned // Set the Cache-Control header for all the responses returned
// by Vault // by Vault
nw.Header().Set("Cache-Control", "no-store") nw.Header().Set("Cache-Control", "no-store")
@ -367,6 +376,43 @@ func wrapGenericHandler(core *vault.Core, h http.Handler, props *vault.HandlerPr
return return
} }
// The uuid for the request is going to be generated when a logical
// request is generated. But, here we generate one to be able to track
// in-flight requests, and use that to update the req data with clientID
inFlightReqID, err := uuid.GenerateUUID()
if err != nil {
respondError(nw, http.StatusInternalServerError, fmt.Errorf("failed to generate an identifier for the in-flight request"))
}
// adding an entry to the context to enable updating in-flight
// data with ClientID in the logical layer
r = r.WithContext(context.WithValue(r.Context(), logical.CtxKeyInFlightRequestID{}, inFlightReqID))
// extracting the client address to be included in the in-flight request
var clientAddr string
headers := r.Header[textproto.CanonicalMIMEHeaderKey("X-Forwarded-For")]
if len(headers) == 0 {
clientAddr = r.RemoteAddr
} else {
clientAddr = headers[0]
}
// getting the request method
requestMethod := r.Method
// Storing the in-flight requests. Path should include namespace as well
core.StoreInFlightReqData(
inFlightReqID,
vault.InFlightReqData {
StartTime: inFlightReqStartTime,
ReqPath: r.URL.Path,
ClientRemoteAddr: clientAddr,
Method: requestMethod,
})
defer func() {
// Not expecting this fail, so skipping the assertion check
core.FinalizeInFlightReqData(inFlightReqID, nw.StatusCode)
}()
// Setting the namespace in the header to be included in the error message // Setting the namespace in the header to be included in the error message
ns := r.Header.Get(consts.NamespaceHeaderName) ns := r.Header.Get(consts.NamespaceHeaderName)
if ns != "" { if ns != "" {

View File

@ -294,6 +294,45 @@ func TestHandler_CacheControlNoStore(t *testing.T) {
} }
} }
func TestHandler_InFlightRequest(t *testing.T) {
core, _, token := vault.TestCoreUnsealed(t)
ln, addr := TestServer(t, core)
defer ln.Close()
TestServerAuth(t, addr, token)
req, err := http.NewRequest("GET", addr+"/v1/sys/in-flight-req", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
req.Header.Set(consts.AuthHeaderName, token)
client := cleanhttp.DefaultClient()
resp, err := client.Do(req)
if err != nil {
t.Fatalf("err: %s", err)
}
if resp == nil {
t.Fatalf("nil response")
}
var actual map[string]interface{}
testResponseStatus(t, resp, 200)
testResponseBody(t, resp, &actual)
if actual == nil || len(actual) == 0 {
t.Fatal("expected to get at least one in-flight request, got nil or zero length map")
}
for _, v := range actual {
reqInfo, ok := v.(map[string]interface{})
if !ok {
t.Fatal("failed to read in-flight request")
}
if reqInfo["request_path"] != "/v1/sys/in-flight-req" {
t.Fatalf("expected /v1/sys/in-flight-req in-flight request path, got %s", actual["request_path"])
}
}
}
// TestHandler_MissingToken tests the response / error code if a request comes // TestHandler_MissingToken tests the response / error code if a request comes
// in with a missing client token. See // in with a missing client token. See
// https://github.com/hashicorp/vault/issues/8377 // https://github.com/hashicorp/vault/issues/8377

View File

@ -183,7 +183,7 @@ func buildLogicalRequestNoAuth(perfStandby bool, w http.ResponseWriter, r *http.
requestId, err := uuid.GenerateUUID() requestId, err := uuid.GenerateUUID()
if err != nil { if err != nil {
return nil, nil, http.StatusBadRequest, fmt.Errorf("failed to generate identifier for the request: %w", err) return nil, nil, http.StatusInternalServerError, fmt.Errorf("failed to generate identifier for the request: %w", err)
} }
req := &logical.Request{ req := &logical.Request{

View File

@ -48,6 +48,7 @@ func TestSysConfigState_Sanitized(t *testing.T) {
"plugin_directory": "", "plugin_directory": "",
"enable_response_header_hostname": false, "enable_response_header_hostname": false,
"enable_response_header_raft_node_id": false, "enable_response_header_raft_node_id": false,
"log_requests_level": "",
} }
expected = map[string]interface{}{ expected = map[string]interface{}{

View File

@ -0,0 +1,23 @@
package http
import (
"net/http"
"github.com/hashicorp/vault/vault"
)
func handleUnAuthenticatedInFlightRequest(core *vault.Core) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
default:
respondError(w, http.StatusMethodNotAllowed, nil)
return
}
currentInFlightReqMap := core.LoadInFlightReqData()
respondOk(w, currentInFlightReqMap)
})
}

View File

@ -0,0 +1,46 @@
package http
import (
"testing"
"github.com/hashicorp/vault/internalshared/configutil"
"github.com/hashicorp/vault/vault"
)
func TestInFlightRequestUnauthenticated(t *testing.T) {
conf := &vault.CoreConfig{}
core, _, token := vault.TestCoreUnsealedWithConfig(t, conf)
ln, addr := TestServer(t, core)
TestServerAuth(t, addr, token)
// Default: Only authenticated access
resp := testHttpGet(t, "", addr+"/v1/sys/in-flight-req")
testResponseStatus(t, resp, 403)
resp = testHttpGet(t, token, addr+"/v1/sys/in-flight-req")
testResponseStatus(t, resp, 200)
// Close listener
ln.Close()
// Setup new custom listener with unauthenticated metrics access
ln, addr = TestListener(t)
props := &vault.HandlerProperties{
Core: core,
ListenerConfig: &configutil.Listener{
InFlightRequestLogging: configutil.ListenerInFlightRequestLogging{
UnauthenticatedInFlightAccess: true,
},
},
}
TestServerWithListenerAndProperties(t, ln, addr, core, props)
defer ln.Close()
TestServerAuth(t, addr, token)
// Test without token
resp = testHttpGet(t, "", addr+"/v1/sys/in-flight-req")
testResponseStatus(t, resp, 200)
// Should also work with token
resp = testHttpGet(t, token, addr+"/v1/sys/in-flight-req")
testResponseStatus(t, resp, 200)
}

View File

@ -24,9 +24,15 @@ type ListenerTelemetry struct {
} }
type ListenerProfiling struct { type ListenerProfiling struct {
UnusedKeys UnusedKeyMap `hcl:",unusedKeyPositions"` UnusedKeys UnusedKeyMap `hcl:",unusedKeyPositions"`
UnauthenticatedPProfAccess bool `hcl:"-"` UnauthenticatedPProfAccess bool `hcl:"-"`
UnauthenticatedPProfAccessRaw interface{} `hcl:"unauthenticated_pprof_access,alias:UnauthenticatedPProfAccessRaw"` UnauthenticatedPProfAccessRaw interface{} `hcl:"unauthenticated_pprof_access,alias:UnauthenticatedPProfAccessRaw"`
}
type ListenerInFlightRequestLogging struct {
UnusedKeys UnusedKeyMap `hcl:",unusedKeyPositions"`
UnauthenticatedInFlightAccess bool `hcl:"-"`
UnauthenticatedInFlightAccessRaw interface{} `hcl:"unauthenticated_in_flight_requests_access,alias:unauthenticatedInFlightAccessRaw"`
} }
// Listener is the listener configuration for the server. // Listener is the listener configuration for the server.
@ -87,8 +93,9 @@ type Listener struct {
SocketUser string `hcl:"socket_user"` SocketUser string `hcl:"socket_user"`
SocketGroup string `hcl:"socket_group"` SocketGroup string `hcl:"socket_group"`
Telemetry ListenerTelemetry `hcl:"telemetry"` Telemetry ListenerTelemetry `hcl:"telemetry"`
Profiling ListenerProfiling `hcl:"profiling"` Profiling ListenerProfiling `hcl:"profiling"`
InFlightRequestLogging ListenerInFlightRequestLogging `hcl:"inflight_requests_logging"`
// RandomPort is used only for some testing purposes // RandomPort is used only for some testing purposes
RandomPort bool `hcl:"-"` RandomPort bool `hcl:"-"`
@ -345,6 +352,17 @@ func ParseListeners(result *SharedConfig, list *ast.ObjectList) error {
} }
} }
// InFlight Request logging
{
if l.InFlightRequestLogging.UnauthenticatedInFlightAccessRaw != nil {
if l.InFlightRequestLogging.UnauthenticatedInFlightAccess, err = parseutil.ParseBool(l.InFlightRequestLogging.UnauthenticatedInFlightAccessRaw); err != nil {
return multierror.Prefix(fmt.Errorf("invalid value for inflight_requests_logging.unauthenticated_in_flight_requests_access: %w", err), fmt.Sprintf("listeners.%d", i))
}
l.InFlightRequestLogging.UnauthenticatedInFlightAccessRaw = ""
}
}
// CORS // CORS
{ {
if l.CorsEnabledRaw != nil { if l.CorsEnabledRaw != nil {

View File

@ -382,3 +382,9 @@ type CustomHeader struct {
Name string Name string
Value string Value string
} }
type CtxKeyInFlightRequestID struct{}
func (c CtxKeyInFlightRequestID) String() string {
return "in-flight-request-ID"
}

View File

@ -228,7 +228,7 @@ type WrappingResponseWriter interface {
type StatusHeaderResponseWriter struct { type StatusHeaderResponseWriter struct {
wrapped http.ResponseWriter wrapped http.ResponseWriter
wroteHeader bool wroteHeader bool
statusCode int StatusCode int
headers map[string][]*CustomHeader headers map[string][]*CustomHeader
} }
@ -236,7 +236,7 @@ func NewStatusHeaderResponseWriter(w http.ResponseWriter, h map[string][]*Custom
return &StatusHeaderResponseWriter{ return &StatusHeaderResponseWriter{
wrapped: w, wrapped: w,
wroteHeader: false, wroteHeader: false,
statusCode: 200, StatusCode: 200,
headers: h, headers: h,
} }
} }
@ -259,7 +259,7 @@ func (w *StatusHeaderResponseWriter) Write(buf []byte) (int, error) {
// statusHeaderResponseWriter struct are called the internal call to the // statusHeaderResponseWriter struct are called the internal call to the
// WriterHeader invoked from inside Write method won't change the headers. // WriterHeader invoked from inside Write method won't change the headers.
if !w.wroteHeader { if !w.wroteHeader {
w.setCustomResponseHeaders(w.statusCode) w.setCustomResponseHeaders(w.StatusCode)
} }
return w.wrapped.Write(buf) return w.wrapped.Write(buf)
@ -268,7 +268,7 @@ func (w *StatusHeaderResponseWriter) Write(buf []byte) (int, error) {
func (w *StatusHeaderResponseWriter) WriteHeader(statusCode int) { func (w *StatusHeaderResponseWriter) WriteHeader(statusCode int) {
w.setCustomResponseHeaders(statusCode) w.setCustomResponseHeaders(statusCode)
w.wrapped.WriteHeader(statusCode) w.wrapped.WriteHeader(statusCode)
w.statusCode = statusCode w.StatusCode = statusCode
// in cases where Write is called after WriteHeader, let's prevent setting // in cases where Write is called after WriteHeader, let's prevent setting
// ResponseWriter headers twice // ResponseWriter headers twice
w.wroteHeader = true w.wroteHeader = true

View File

@ -1,7 +1,11 @@
package logical package logical
import ( import (
"crypto/sha256"
"encoding/base64"
"fmt" "fmt"
"sort"
"strings"
"time" "time"
sockaddr "github.com/hashicorp/go-sockaddr" sockaddr "github.com/hashicorp/go-sockaddr"
@ -20,13 +24,24 @@ const (
// TokenTypeBatch is a batch token // TokenTypeBatch is a batch token
TokenTypeBatch TokenTypeBatch
// TokenTypeDefaultService, configured on a mount, means that if // TokenTypeDefaultService configured on a mount, means that if
// TokenTypeDefault is sent back by the mount, create Service tokens // TokenTypeDefault is sent back by the mount, create Service tokens
TokenTypeDefaultService TokenTypeDefaultService
// TokenTypeDefaultBatch, configured on a mount, means that if // TokenTypeDefaultBatch configured on a mount, means that if
// TokenTypeDefault is sent back by the mount, create Batch tokens // TokenTypeDefault is sent back by the mount, create Batch tokens
TokenTypeDefaultBatch TokenTypeDefaultBatch
// ClientIDTWEDelimiter Delimiter between the string fields used to generate a client
// ID for tokens without entities. This is the 0 character, which
// is a non-printable string. Please see unicode.IsPrint for details.
ClientIDTWEDelimiter = rune('\x00')
// SortedPoliciesTWEDelimiter Delimiter between each policy in the sorted policies used to
// generate a client ID for tokens without entities. This is the 127
// character, which is a non-printable string. Please see unicode.IsPrint
// for details.
SortedPoliciesTWEDelimiter = rune('\x7F')
) )
func (t *TokenType) UnmarshalJSON(b []byte) error { func (t *TokenType) UnmarshalJSON(b []byte) error {
@ -154,6 +169,46 @@ type TokenEntry struct {
CubbyholeID string `json:"cubbyhole_id" mapstructure:"cubbyhole_id" structs:"cubbyhole_id" sentinel:""` CubbyholeID string `json:"cubbyhole_id" mapstructure:"cubbyhole_id" structs:"cubbyhole_id" sentinel:""`
} }
// CreateClientID returns the client ID, and a boolean which is false if the clientID
// has an entity, and true otherwise
func (te *TokenEntry) CreateClientID() (string, bool) {
var clientIDInputBuilder strings.Builder
// if entry has an associated entity ID, return it
if te.EntityID != "" {
return te.EntityID, false
}
// The entry is associated with a TWE (token without entity). In this case
// we must create a client ID by calculating the following formula:
// clientID = SHA256(sorted policies + namespace)
// Step 1: Copy entry policies to a new struct
sortedPolicies := make([]string, len(te.Policies))
copy(sortedPolicies, te.Policies)
// Step 2: Sort and join copied policies
sort.Strings(sortedPolicies)
for _, pol := range sortedPolicies {
clientIDInputBuilder.WriteRune(SortedPoliciesTWEDelimiter)
clientIDInputBuilder.WriteString(pol)
}
// Step 3: Add namespace ID
clientIDInputBuilder.WriteRune(ClientIDTWEDelimiter)
clientIDInputBuilder.WriteString(te.NamespaceID)
if clientIDInputBuilder.Len() == 0 {
return "", true
}
// Step 4: Remove the first character in the string, as it's an unnecessary delimiter
clientIDInput := clientIDInputBuilder.String()[1:]
// Step 5: Hash the sum
hashed := sha256.Sum256([]byte(clientIDInput))
return base64.StdEncoding.EncodeToString(hashed[:]), true
}
func (te *TokenEntry) SentinelGet(key string) (interface{}, error) { func (te *TokenEntry) SentinelGet(key string) (interface{}, error) {
if te == nil { if te == nil {
return nil, nil return nil, nil

View File

@ -1,6 +1,8 @@
package logical package logical
import ( import (
"crypto/sha256"
"encoding/base64"
"encoding/json" "encoding/json"
"testing" "testing"
) )
@ -41,3 +43,61 @@ func TestJSONSerialization(t *testing.T) {
t.Fatalf("expected %v, got %v", tt, utt) t.Fatalf("expected %v, got %v", tt, utt)
} }
} }
// TestCreateClientID verifies that CreateClientID uses the entity ID for a token
// entry if one exists, and creates an appropriate client ID otherwise.
func TestCreateClientID(t *testing.T) {
entry := TokenEntry{NamespaceID: "namespaceFoo", Policies: []string{"bar", "baz", "foo", "banana"}}
id, isTWE := entry.CreateClientID()
if !isTWE {
t.Fatalf("TWE token should return true value in isTWE bool")
}
expectedIDPlaintext := "banana" + string(SortedPoliciesTWEDelimiter) + "bar" +
string(SortedPoliciesTWEDelimiter) + "baz" +
string(SortedPoliciesTWEDelimiter) + "foo" + string(ClientIDTWEDelimiter) + "namespaceFoo"
hashed := sha256.Sum256([]byte(expectedIDPlaintext))
expectedID := base64.StdEncoding.EncodeToString(hashed[:])
if expectedID != id {
t.Fatalf("wrong ID: expected %s, found %s", expectedID, id)
}
// Test with entityID
entry = TokenEntry{EntityID: "entityFoo", NamespaceID: "namespaceFoo", Policies: []string{"bar", "baz", "foo", "banana"}}
id, isTWE = entry.CreateClientID()
if isTWE {
t.Fatalf("token with entity should return false value in isTWE bool")
}
if id != "entityFoo" {
t.Fatalf("client ID should be entity ID")
}
// Test without namespace
entry = TokenEntry{Policies: []string{"bar", "baz", "foo", "banana"}}
id, isTWE = entry.CreateClientID()
if !isTWE {
t.Fatalf("TWE token should return true value in isTWE bool")
}
expectedIDPlaintext = "banana" + string(SortedPoliciesTWEDelimiter) + "bar" +
string(SortedPoliciesTWEDelimiter) + "baz" +
string(SortedPoliciesTWEDelimiter) + "foo" + string(ClientIDTWEDelimiter)
hashed = sha256.Sum256([]byte(expectedIDPlaintext))
expectedID = base64.StdEncoding.EncodeToString(hashed[:])
if expectedID != id {
t.Fatalf("wrong ID: expected %s, found %s", expectedID, id)
}
// Test without policies
entry = TokenEntry{NamespaceID: "namespaceFoo"}
id, isTWE = entry.CreateClientID()
if !isTWE {
t.Fatalf("TWE token should return true value in isTWE bool")
}
expectedIDPlaintext = "namespaceFoo"
hashed = sha256.Sum256([]byte(expectedIDPlaintext))
expectedID = base64.StdEncoding.EncodeToString(hashed[:])
if expectedID != id {
t.Fatalf("wrong ID: expected %s, found %s", expectedID, id)
}
}

View File

@ -2,8 +2,6 @@ package vault
import ( import (
"context" "context"
"crypto/sha256"
"encoding/base64"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -63,17 +61,6 @@ const (
// Estimates as 8KiB / 64 bytes = 128 // Estimates as 8KiB / 64 bytes = 128
activityFragmentStandbyCapacity = 128 activityFragmentStandbyCapacity = 128
// Delimiter between the string fields used to generate a client
// ID for tokens without entities. This is the 0 character, which
// is a non-printable string. Please see unicode.IsPrint for details.
clientIDTWEDelimiter = rune('\x00')
// Delimiter between each policy in the sorted policies used to
// generate a client ID for tokens without entities. This is the 127
// character, which is a non-printable string. Please see unicode.IsPrint
// for details.
sortedPoliciesTWEDelimiter = rune('\x7F')
// trackedTWESegmentPeriod is a time period of a little over a month, and represents // trackedTWESegmentPeriod is a time period of a little over a month, and represents
// the amount of time that needs to pass after a 1.9 or later upgrade to result in // the amount of time that needs to pass after a 1.9 or later upgrade to result in
// all fragments and segments no longer storing token counts in the directtokens // all fragments and segments no longer storing token counts in the directtokens
@ -1591,74 +1578,29 @@ func (a *ActivityLog) loadConfigOrDefault(ctx context.Context) (activityConfig,
} }
// HandleTokenUsage adds the TokenEntry to the current fragment of the activity log // HandleTokenUsage adds the TokenEntry to the current fragment of the activity log
// and returns the corresponding Client ID.
// This currently occurs on token usage only. // This currently occurs on token usage only.
func (a *ActivityLog) HandleTokenUsage(entry *logical.TokenEntry) string { func (a *ActivityLog) HandleTokenUsage(entry *logical.TokenEntry, clientID string, isTWE bool) {
// First, check if a is enabled, so as to avoid the cost of creating an ID for // First, check if a is enabled, so as to avoid the cost of creating an ID for
// tokens without entities in the case where it not. // tokens without entities in the case where it not.
a.fragmentLock.RLock() a.fragmentLock.RLock()
if !a.enabled { if !a.enabled {
a.fragmentLock.RUnlock() a.fragmentLock.RUnlock()
return "" return
} }
a.fragmentLock.RUnlock() a.fragmentLock.RUnlock()
// Do not count wrapping tokens in client count // Do not count wrapping tokens in client count
if IsWrappingToken(entry) { if IsWrappingToken(entry) {
return "" return
} }
// Do not count root tokens in client count. // Do not count root tokens in client count.
if entry.IsRoot() { if entry.IsRoot() {
return "" return
} }
// Parse an entry's client ID and add it to the activity log // Parse an entry's client ID and add it to the activity log
clientID, isTWE := a.CreateClientID(entry)
a.AddClientToFragment(clientID, entry.NamespaceID, entry.CreationTime, isTWE) a.AddClientToFragment(clientID, entry.NamespaceID, entry.CreationTime, isTWE)
return clientID
}
// CreateClientID returns the client ID, and a boolean which is false if the clientID
// has an entity, and true otherwise
func (a *ActivityLog) CreateClientID(entry *logical.TokenEntry) (string, bool) {
var clientIDInputBuilder strings.Builder
// if entry has an associated entity ID, return it
if entry.EntityID != "" {
return entry.EntityID, false
}
// The entry is associated with a TWE (token without entity). In this case
// we must create a client ID by calculating the following formula:
// clientID = SHA256(sorted policies + namespace)
// Step 1: Copy entry policies to a new struct
sortedPolicies := make([]string, len(entry.Policies))
copy(sortedPolicies, entry.Policies)
// Step 2: Sort and join copied policies
sort.Strings(sortedPolicies)
for _, pol := range sortedPolicies {
clientIDInputBuilder.WriteRune(sortedPoliciesTWEDelimiter)
clientIDInputBuilder.WriteString(pol)
}
// Step 3: Add namespace ID
clientIDInputBuilder.WriteRune(clientIDTWEDelimiter)
clientIDInputBuilder.WriteString(entry.NamespaceID)
if clientIDInputBuilder.Len() == 0 {
a.logger.Error("vault token with no entity ID, policies, or namespace was recorded " +
"in the activity log")
return "", true
}
// Step 4: Remove the first character in the string, as it's an unnecessary delimiter
clientIDInput := clientIDInputBuilder.String()[1:]
// Step 5: Hash the sum
hashed := sha256.Sum256([]byte(clientIDInput))
return base64.StdEncoding.EncodeToString(hashed[:]), true
} }
func (a *ActivityLog) namespaceToLabel(ctx context.Context, nsID string) string { func (a *ActivityLog) namespaceToLabel(ctx context.Context, nsID string) string {

View File

@ -2,8 +2,6 @@ package vault
import ( import (
"context" "context"
"crypto/sha256"
"encoding/base64"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -114,13 +112,16 @@ func TestActivityLog_Creation_WrappingTokens(t *testing.T) {
a.fragmentLock.Unlock() a.fragmentLock.Unlock()
const namespace_id = "ns123" const namespace_id = "ns123"
a.HandleTokenUsage(&logical.TokenEntry{ te := &logical.TokenEntry{
Path: "test", Path: "test",
Policies: []string{responseWrappingPolicyName}, Policies: []string{responseWrappingPolicyName},
CreationTime: time.Now().Unix(), CreationTime: time.Now().Unix(),
TTL: 3600, TTL: 3600,
NamespaceID: namespace_id, NamespaceID: namespace_id,
}) }
id, isTWE := te.CreateClientID()
a.HandleTokenUsage(te, id, isTWE)
a.fragmentLock.Lock() a.fragmentLock.Lock()
if a.fragment != nil { if a.fragment != nil {
@ -128,13 +129,16 @@ func TestActivityLog_Creation_WrappingTokens(t *testing.T) {
} }
a.fragmentLock.Unlock() a.fragmentLock.Unlock()
a.HandleTokenUsage(&logical.TokenEntry{ teNew := &logical.TokenEntry{
Path: "test", Path: "test",
Policies: []string{controlGroupPolicyName}, Policies: []string{controlGroupPolicyName},
CreationTime: time.Now().Unix(), CreationTime: time.Now().Unix(),
TTL: 3600, TTL: 3600,
NamespaceID: namespace_id, NamespaceID: namespace_id,
}) }
id, isTWE = teNew.CreateClientID()
a.HandleTokenUsage(teNew, id, isTWE)
a.fragmentLock.Lock() a.fragmentLock.Lock()
if a.fragment != nil { if a.fragment != nil {
@ -359,13 +363,15 @@ func TestActivityLog_SaveTokensToStorageDoesNotUpdateTokenCount(t *testing.T) {
tokenEntryOne := logical.TokenEntry{NamespaceID: "ns1_id", Policies: []string{"hi"}} tokenEntryOne := logical.TokenEntry{NamespaceID: "ns1_id", Policies: []string{"hi"}}
entityEntry := logical.TokenEntry{EntityID: "foo", NamespaceID: "ns1_id", Policies: []string{"hi"}} entityEntry := logical.TokenEntry{EntityID: "foo", NamespaceID: "ns1_id", Policies: []string{"hi"}}
id, _ := a.CreateClientID(&tokenEntryOne) idNonEntity, isTWE := tokenEntryOne.CreateClientID()
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
a.HandleTokenUsage(&tokenEntryOne) a.HandleTokenUsage(&tokenEntryOne, idNonEntity, isTWE)
} }
idEntity, isTWE := entityEntry.CreateClientID()
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
a.HandleTokenUsage(&entityEntry) a.HandleTokenUsage(&entityEntry, idEntity, isTWE)
} }
err := a.saveCurrentSegmentToStorage(ctx, false) err := a.saveCurrentSegmentToStorage(ctx, false)
if err != nil { if err != nil {
@ -394,14 +400,14 @@ func TestActivityLog_SaveTokensToStorageDoesNotUpdateTokenCount(t *testing.T) {
for _, client := range out.Clients { for _, client := range out.Clients {
if client.NonEntity == true { if client.NonEntity == true {
nonEntityTokenFlag = true nonEntityTokenFlag = true
if client.ClientID != id { if client.ClientID != idNonEntity {
t.Fatalf("expected a client ID of %s, but saved instead %s", id, client.ClientID) t.Fatalf("expected a client ID of %s, but saved instead %s", idNonEntity, client.ClientID)
} }
} }
if client.NonEntity == false { if client.NonEntity == false {
entityTokenFlag = true entityTokenFlag = true
if client.ClientID != "foo" { if client.ClientID != idEntity {
t.Fatalf("expected a client ID of %s, but saved instead %s", "foo", client.ClientID) t.Fatalf("expected a client ID of %s, but saved instead %s", idEntity, client.ClientID)
} }
} }
} }
@ -1520,65 +1526,6 @@ func TestActivityLog_refreshFromStoredLog(t *testing.T) {
} }
} }
// TestCreateClientID verifies that CreateClientID uses the entity ID for a token
// entry if one exists, and creates an appropriate client ID otherwise.
func TestCreateClientID(t *testing.T) {
entry := logical.TokenEntry{NamespaceID: "namespaceFoo", Policies: []string{"bar", "baz", "foo", "banana"}}
activityLog := ActivityLog{}
id, isTWE := activityLog.CreateClientID(&entry)
if !isTWE {
t.Fatalf("TWE token should return true value in isTWE bool")
}
expectedIDPlaintext := "banana" + string(sortedPoliciesTWEDelimiter) + "bar" +
string(sortedPoliciesTWEDelimiter) + "baz" +
string(sortedPoliciesTWEDelimiter) + "foo" + string(clientIDTWEDelimiter) + "namespaceFoo"
hashed := sha256.Sum256([]byte(expectedIDPlaintext))
expectedID := base64.StdEncoding.EncodeToString(hashed[:])
if expectedID != id {
t.Fatalf("wrong ID: expected %s, found %s", expectedID, id)
}
// Test with entityID
entry = logical.TokenEntry{EntityID: "entityFoo", NamespaceID: "namespaceFoo", Policies: []string{"bar", "baz", "foo", "banana"}}
id, isTWE = activityLog.CreateClientID(&entry)
if isTWE {
t.Fatalf("token with entity should return false value in isTWE bool")
}
if id != "entityFoo" {
t.Fatalf("client ID should be entity ID")
}
// Test without namespace
entry = logical.TokenEntry{Policies: []string{"bar", "baz", "foo", "banana"}}
id, isTWE = activityLog.CreateClientID(&entry)
if !isTWE {
t.Fatalf("TWE token should return true value in isTWE bool")
}
expectedIDPlaintext = "banana" + string(sortedPoliciesTWEDelimiter) + "bar" +
string(sortedPoliciesTWEDelimiter) + "baz" +
string(sortedPoliciesTWEDelimiter) + "foo" + string(clientIDTWEDelimiter)
hashed = sha256.Sum256([]byte(expectedIDPlaintext))
expectedID = base64.StdEncoding.EncodeToString(hashed[:])
if expectedID != id {
t.Fatalf("wrong ID: expected %s, found %s", expectedID, id)
}
// Test without policies
entry = logical.TokenEntry{NamespaceID: "namespaceFoo"}
id, isTWE = activityLog.CreateClientID(&entry)
if !isTWE {
t.Fatalf("TWE token should return true value in isTWE bool")
}
expectedIDPlaintext = "namespaceFoo"
hashed = sha256.Sum256([]byte(expectedIDPlaintext))
expectedID = base64.StdEncoding.EncodeToString(hashed[:])
if expectedID != id {
t.Fatalf("wrong ID: expected %s, found %s", expectedID, id)
}
}
func TestActivityLog_refreshFromStoredLogWithBackgroundLoadingCancelled(t *testing.T) { func TestActivityLog_refreshFromStoredLogWithBackgroundLoadingCancelled(t *testing.T) {
a, expectedClientRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true) a, expectedClientRecords, expectedTokenCounts := setupActivityRecordsInStorage(t, time.Now().UTC(), true, true)
a.SetEnable(true) a.SetEnable(true)

View File

@ -365,6 +365,9 @@ type Core struct {
// metrics emission and sealing leading to a nil pointer // metrics emission and sealing leading to a nil pointer
metricsMutex sync.Mutex metricsMutex sync.Mutex
// inFlightReqMap is used to store info about in-flight requests
inFlightReqData *InFlightRequests
// metricSink is the destination for all metrics that have // metricSink is the destination for all metrics that have
// a cluster label. // a cluster label.
metricSink *metricsutil.ClusterMetricSink metricSink *metricsutil.ClusterMetricSink
@ -386,6 +389,9 @@ type Core struct {
// disabled // disabled
physicalCache physical.ToggleablePurgemonster physicalCache physical.ToggleablePurgemonster
// logRequestsLevel indicates at which level requests should be logged
logRequestsLevel *uberAtomic.Int32
// reloadFuncs is a map containing reload functions // reloadFuncs is a map containing reload functions
reloadFuncs map[string][]reloadutil.ReloadFunc reloadFuncs map[string][]reloadutil.ReloadFunc
@ -848,6 +854,11 @@ func CreateCore(conf *CoreConfig) (*Core, error) {
c.router.logger = c.logger.Named("router") c.router.logger = c.logger.Named("router")
c.allLoggers = append(c.allLoggers, c.router.logger) c.allLoggers = append(c.allLoggers, c.router.logger)
c.inFlightReqData = &InFlightRequests{
InFlightReqMap: &sync.Map{},
InFlightReqCount: uberAtomic.NewUint64(0),
}
c.SetConfig(conf.RawConfig) c.SetConfig(conf.RawConfig)
atomic.StoreUint32(c.replicationState, uint32(consts.ReplicationDRDisabled|consts.ReplicationPerformanceDisabled)) atomic.StoreUint32(c.replicationState, uint32(consts.ReplicationDRDisabled|consts.ReplicationPerformanceDisabled))
@ -1027,6 +1038,15 @@ func NewCore(conf *CoreConfig) (*Core, error) {
c.customListenerHeader.Store(([]*ListenerCustomHeaders)(nil)) c.customListenerHeader.Store(([]*ListenerCustomHeaders)(nil))
} }
logRequestsLevel := conf.RawConfig.LogRequestsLevel
c.logRequestsLevel = uberAtomic.NewInt32(0)
switch {
case log.LevelFromString(logRequestsLevel) > log.NoLevel && log.LevelFromString(logRequestsLevel) < log.Off:
c.logRequestsLevel.Store(int32(log.LevelFromString(logRequestsLevel)))
case logRequestsLevel != "":
c.logger.Warn("invalid log_requests_level", "level", conf.RawConfig.LogRequestsLevel)
}
quotasLogger := conf.Logger.Named("quotas") quotasLogger := conf.Logger.Named("quotas")
c.allLoggers = append(c.allLoggers, quotasLogger) c.allLoggers = append(c.allLoggers, quotasLogger)
c.quotaManager, err = quotas.NewManager(quotasLogger, c.quotaLeaseWalker, c.metricSink) c.quotaManager, err = quotas.NewManager(quotasLogger, c.quotaLeaseWalker, c.metricSink)
@ -2947,6 +2967,95 @@ type LicenseState struct {
Terminated bool Terminated bool
} }
type InFlightRequests struct {
InFlightReqMap *sync.Map
InFlightReqCount *uberAtomic.Uint64
}
type InFlightReqData struct {
StartTime time.Time `json:"start_time"`
ClientRemoteAddr string `json:"client_remote_address"`
ReqPath string `json:"request_path"`
Method string `json:"request_method"`
ClientID string `json:"client_id"`
}
func (c *Core) StoreInFlightReqData(reqID string, data InFlightReqData) {
c.inFlightReqData.InFlightReqMap.Store(reqID, data)
c.inFlightReqData.InFlightReqCount.Inc()
}
// FinalizeInFlightReqData is going log the completed request if the
// corresponding server config option is enabled. It also removes the
// request from the inFlightReqMap and decrement the number of in-flight
// requests by one.
func (c *Core) FinalizeInFlightReqData(reqID string, statusCode int) {
if c.logRequestsLevel != nil && c.logRequestsLevel.Load() != 0 {
c.LogCompletedRequests(reqID, statusCode)
}
c.inFlightReqData.InFlightReqMap.Delete(reqID)
c.inFlightReqData.InFlightReqCount.Dec()
}
// LoadInFlightReqData creates a snapshot map of the current
// in-flight requests
func (c *Core) LoadInFlightReqData() map[string]InFlightReqData {
currentInFlightReqMap := make(map[string]InFlightReqData)
c.inFlightReqData.InFlightReqMap.Range(func(key, value interface{}) bool {
// there is only one writer to this map, so skip checking for errors
v := value.(InFlightReqData)
currentInFlightReqMap[key.(string)] = v
return true
})
return currentInFlightReqMap
}
// UpdateInFlightReqData updates the data for a specific reqID with
// the clientID
func (c *Core) UpdateInFlightReqData(reqID, clientID string) {
v, ok := c.inFlightReqData.InFlightReqMap.Load(reqID)
if !ok {
c.Logger().Trace("failed to retrieve request with ID %v", reqID)
return
}
// there is only one writer to this map, so skip checking for errors
reqData := v.(InFlightReqData)
reqData.ClientID = clientID
c.inFlightReqData.InFlightReqMap.Store(reqID, reqData)
}
// LogCompletedRequests Logs the completed request to the server logs
func (c *Core) LogCompletedRequests(reqID string, statusCode int) {
logLevel := log.Level(c.logRequestsLevel.Load())
v, ok := c.inFlightReqData.InFlightReqMap.Load(reqID)
if !ok {
c.logger.Log(logLevel, fmt.Sprintf("failed to retrieve request with ID %v", reqID))
return
}
// there is only one writer to this map, so skip checking for errors
reqData := v.(InFlightReqData)
c.logger.Log(logLevel, "completed_request","client_id", reqData.ClientID, "client_address", reqData.ClientRemoteAddr, "status_code", statusCode, "request_path", reqData.ReqPath, "request_method", reqData.Method)
}
func (c *Core) ReloadLogRequestsLevel() {
conf := c.rawConfig.Load()
if conf == nil {
return
}
infoLevel := conf.(*server.Config).LogRequestsLevel
switch {
case log.LevelFromString(infoLevel) > log.NoLevel && log.LevelFromString(infoLevel) < log.Off:
c.logRequestsLevel.Store(int32(log.LevelFromString(infoLevel)))
case infoLevel != "":
c.logger.Warn("invalid log_requests_level", "level", infoLevel)
}
}
type PeerNode struct { type PeerNode struct {
Hostname string `json:"hostname"` Hostname string `json:"hostname"`
APIAddress string `json:"api_address"` APIAddress string `json:"api_address"`
@ -2967,4 +3076,4 @@ func (c *Core) GetHAPeerNodesCached() []PeerNode {
}) })
} }
return nodes return nodes
} }

View File

@ -91,6 +91,9 @@ func (c *Core) metricsLoop(stopCh chan struct{}) {
c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "dr", "secondary"}, 0, nil) c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "dr", "secondary"}, 0, nil)
} }
// Capture the total number of in-flight requests
c.inFlightReqGaugeMetric()
// Refresh gauge metrics that are looped // Refresh gauge metrics that are looped
c.cachedGaugeMetricsEmitter() c.cachedGaugeMetricsEmitter()
@ -534,3 +537,9 @@ func (c *Core) cachedGaugeMetricsEmitter() {
loopMetrics.Range(emit) loopMetrics.Range(emit)
} }
func (c *Core) inFlightReqGaugeMetric() {
totalInFlightReq := c.inFlightReqData.InFlightReqCount.Load()
// Adding a gauge metric to capture total number of inflight requests
c.metricSink.SetGaugeWithLabels([]string{"core", "in_flight_requests"}, float32(totalInFlightReq), nil)
}

View File

@ -177,6 +177,7 @@ func NewSystemBackend(core *Core, logger log.Logger) *SystemBackend {
b.Backend.Paths = append(b.Backend.Paths, b.remountPath()) b.Backend.Paths = append(b.Backend.Paths, b.remountPath())
b.Backend.Paths = append(b.Backend.Paths, b.metricsPath()) b.Backend.Paths = append(b.Backend.Paths, b.metricsPath())
b.Backend.Paths = append(b.Backend.Paths, b.monitorPath()) b.Backend.Paths = append(b.Backend.Paths, b.monitorPath())
b.Backend.Paths = append(b.Backend.Paths, b.inFlightRequestPath())
b.Backend.Paths = append(b.Backend.Paths, b.hostInfoPath()) b.Backend.Paths = append(b.Backend.Paths, b.hostInfoPath())
b.Backend.Paths = append(b.Backend.Paths, b.quotasPaths()...) b.Backend.Paths = append(b.Backend.Paths, b.quotasPaths()...)
b.Backend.Paths = append(b.Backend.Paths, b.rootActivityPaths()...) b.Backend.Paths = append(b.Backend.Paths, b.rootActivityPaths()...)
@ -3013,6 +3014,29 @@ func (b *SystemBackend) handleMetrics(ctx context.Context, req *logical.Request,
return b.Core.metricsHelper.ResponseForFormat(format), nil return b.Core.metricsHelper.ResponseForFormat(format), nil
} }
func (b *SystemBackend) handleInFlightRequestData(_ context.Context, req *logical.Request, data *framework.FieldData) (*logical.Response, error) {
resp := &logical.Response{
Data: map[string]interface{}{
logical.HTTPContentType: "text/plain",
logical.HTTPStatusCode: http.StatusInternalServerError,
},
}
currentInFlightReqMap := b.Core.LoadInFlightReqData()
content, err := json.Marshal(currentInFlightReqMap)
if err != nil {
resp.Data[logical.HTTPRawBody] = fmt.Sprintf("error while marshalling the in-flight requests data: %s", err)
return resp, nil
}
resp.Data[logical.HTTPContentType] = "application/json"
resp.Data[logical.HTTPRawBody] = content
resp.Data[logical.HTTPStatusCode] = http.StatusOK
return resp, nil
}
func (b *SystemBackend) handleMonitor(ctx context.Context, req *logical.Request, data *framework.FieldData) (*logical.Response, error) { func (b *SystemBackend) handleMonitor(ctx context.Context, req *logical.Request, data *framework.FieldData) (*logical.Response, error) {
ll := data.Get("log_level").(string) ll := data.Get("log_level").(string)
w := req.ResponseWriter w := req.ResponseWriter
@ -4912,6 +4936,14 @@ This path responds to the following HTTP methods.
"Export the metrics aggregated for telemetry purpose.", "Export the metrics aggregated for telemetry purpose.",
"", "",
}, },
"in-flight-req": {
"reports in-flight requests",
`
This path responds to the following HTTP methods.
GET /
Returns a map of in-flight requests.
`,
},
"internal-counters-requests": { "internal-counters-requests": {
"Currently unsupported. Previously, count of requests seen by this Vault cluster over time.", "Currently unsupported. Previously, count of requests seen by this Vault cluster over time.",
"Currently unsupported. Previously, count of requests seen by this Vault cluster over time. Not included in count: health checks, UI asset requests, requests forwarded from another cluster.", "Currently unsupported. Previously, count of requests seen by this Vault cluster over time. Not included in count: health checks, UI asset requests, requests forwarded from another cluster.",

View File

@ -1356,6 +1356,19 @@ func (b *SystemBackend) monitorPath() *framework.Path {
} }
} }
func (b *SystemBackend) inFlightRequestPath() *framework.Path {
return &framework.Path{
Pattern: "in-flight-req",
Operations: map[logical.Operation]framework.OperationHandler {
logical.ReadOperation: &framework.PathOperation{
Callback: b.handleInFlightRequestData,
Summary: strings.TrimSpace(sysHelp["in-flight-req"][0]),
Description: strings.TrimSpace(sysHelp["in-flight-req"][1]),
},
},
}
}
func (b *SystemBackend) hostInfoPath() *framework.Path { func (b *SystemBackend) hostInfoPath() *framework.Path {
return &framework.Path{ return &framework.Path{
Pattern: "host-info/?", Pattern: "host-info/?",

View File

@ -346,6 +346,8 @@ func (c *Core) checkToken(ctx context.Context, req *logical.Request, unauth bool
Accessor: req.ClientTokenAccessor, Accessor: req.ClientTokenAccessor,
} }
var clientID string
var isTWE bool
if te != nil { if te != nil {
auth.IdentityPolicies = identityPolicies[te.NamespaceID] auth.IdentityPolicies = identityPolicies[te.NamespaceID]
auth.TokenPolicies = te.Policies auth.TokenPolicies = te.Policies
@ -362,6 +364,8 @@ func (c *Core) checkToken(ctx context.Context, req *logical.Request, unauth bool
if te.CreationTime > 0 { if te.CreationTime > 0 {
auth.IssueTime = time.Unix(te.CreationTime, 0) auth.IssueTime = time.Unix(te.CreationTime, 0)
} }
clientID, isTWE = te.CreateClientID()
req.ClientID = clientID
} }
// Check the standard non-root ACLs. Return the token entry if it's not // Check the standard non-root ACLs. Return the token entry if it's not
@ -398,7 +402,7 @@ func (c *Core) checkToken(ctx context.Context, req *logical.Request, unauth bool
// If it is an authenticated ( i.e with vault token ) request, increment client count // If it is an authenticated ( i.e with vault token ) request, increment client count
if !unauth && c.activityLog != nil { if !unauth && c.activityLog != nil {
req.ClientID = c.activityLog.HandleTokenUsage(te) c.activityLog.HandleTokenUsage(te, clientID, isTWE)
} }
return auth, te, nil return auth, te, nil
} }
@ -439,7 +443,12 @@ func (c *Core) switchedLockHandleRequest(httpCtx context.Context, req *logical.R
return nil, fmt.Errorf("could not parse namespace from http context: %w", err) return nil, fmt.Errorf("could not parse namespace from http context: %w", err)
} }
resp, err = c.handleCancelableRequest(namespace.ContextWithNamespace(ctx, ns), req) ctx = namespace.ContextWithNamespace(ctx, ns)
inFlightReqID, ok := httpCtx.Value(logical.CtxKeyInFlightRequestID{}).(string)
if ok {
ctx = context.WithValue(ctx, logical.CtxKeyInFlightRequestID{}, inFlightReqID)
}
resp, err = c.handleCancelableRequest(ctx, req)
req.SetTokenEntry(nil) req.SetTokenEntry(nil)
cancel() cancel()
@ -775,6 +784,12 @@ func (c *Core) handleRequest(ctx context.Context, req *logical.Request) (retResp
return nil, nil, ctErr return nil, nil, ctErr
} }
// Updating in-flight request data with client/entity ID
inFlightReqID, ok := ctx.Value(logical.CtxKeyInFlightRequestID{}).(string)
if ok && req.ClientID != "" {
c.UpdateInFlightReqData(inFlightReqID, req.ClientID)
}
// We run this logic first because we want to decrement the use count even // We run this logic first because we want to decrement the use count even
// in the case of an error (assuming we can successfully look up; if we // in the case of an error (assuming we can successfully look up; if we
// need to forward, we exit before now) // need to forward, we exit before now)
@ -1168,6 +1183,13 @@ func (c *Core) handleLoginRequest(ctx context.Context, req *logical.Request) (re
if ctErr == logical.ErrPerfStandbyPleaseForward { if ctErr == logical.ErrPerfStandbyPleaseForward {
return nil, nil, ctErr return nil, nil, ctErr
} }
// Updating in-flight request data with client/entity ID
inFlightReqID, ok := ctx.Value(logical.CtxKeyInFlightRequestID{}).(string)
if ok && req.ClientID != "" {
c.UpdateInFlightReqData(inFlightReqID, req.ClientID)
}
if ctErr != nil { if ctErr != nil {
// If it is an internal error we return that, otherwise we // If it is an internal error we return that, otherwise we
// return invalid request so that the status codes can be correct // return invalid request so that the status codes can be correct

View File

@ -0,0 +1,41 @@
---
layout: api
page_title: /sys/in-flight-req - HTTP API
description: The `/sys/in-flight-req` endpoint is used to get information on in-flight requests.
---
# `/sys/in-flight-req`
The `/sys/in-flight-req` endpoint is used to get information on in-flight requests.
The returned information contains the `start_time`, `client_remote_address`, `request_path`,
`request_method`, and `client_id` of the in-flight requests.
## Collect In-Flight Request Information
This endpoint returns the information about the in-flight requests.
| Method | Path |
| :----- | :---------- |
| `GET` | `/sys/in-flight-req` |
### Sample Request
```shell-session
$ curl \
--header "X-Vault-Token: ..." \
http://127.0.0.1:8200/v1/sys/in-flight-req
```
### Sample Response
```json
{
"9049326b-ceed-1033-c099-96c5cc97db1f": {
"start_time": "2021-11-19T09:13:01.34157-08:00",
"client_remote_address": "127.0.0.3:49816",
"request_path": "/v1/sys/in-flight-req",
"request_method": "GET",
"client_id": "",
}
}
```

View File

@ -183,6 +183,8 @@ default value in the `"/sys/config/ui"` [API endpoint](/api/system/config-ui).
- `unauthenticated_pprof_access` `(bool: false)` - If set to true, allows - `unauthenticated_pprof_access` `(bool: false)` - If set to true, allows
unauthenticated access to the `/v1/sys/pprof` endpoint. unauthenticated access to the `/v1/sys/pprof` endpoint.
- `unauthenticated_in_flight_request_access` `(bool: false)` - If set to true, allows
unauthenticated access to the `/v1/sys/in-flight-req` endpoint.
### `custom_response_headers` Parameters ### `custom_response_headers` Parameters
@ -253,6 +255,7 @@ This example shows enabling unauthenticated profiling access.
listener "tcp" { listener "tcp" {
profiling { profiling {
unauthenticated_pprof_access = true unauthenticated_pprof_access = true
unauthenticated_in_flight_request_access = true
} }
} }
``` ```

View File

@ -0,0 +1,31 @@
---
layout: docs
page_title: Log Completed Requests - Configuration
description: |-
Vault can be configured to log completed requests.
---
# Log Completed Requests
Vault can be configured to log completed requests using the `log_requests_level` configuration parameter.
## Activating the Log Completed Requests
By default, logging completed requests is disabled. To activate the requests logging, set the `log_requests_level`
configuration option in the Vault server configuration to the desired logging level. The acceptable logging levels are
`error`, `warn`, `info`, `debug`, and `trace`.
If the vault server is already running, you can still configure the parameter in the Vault server configuration,
and then send an `SIGHUP` signal to the vault process.
```hcl
log_requests_level = "trace"
listener "tcp" {
# ...
}
```
## Deactivating the Log Completed Requests
To deactivate logging completed requests, simply remove the `log_requests_level`
configuration parameter from the vault server configuration, and send a `SIGHUP` signal to the vault process.

View File

@ -91,6 +91,7 @@ These metrics represent operational aspects of the running Vault instance.
| `vault.core.fetch_acl_and_token` | Duration of time taken by ACL and corresponding token entry fetches handled by Vault core | ms | summary | | `vault.core.fetch_acl_and_token` | Duration of time taken by ACL and corresponding token entry fetches handled by Vault core | ms | summary |
| `vault.core.handle_request` | Duration of time taken by requests handled by Vault core | ms | summary | | `vault.core.handle_request` | Duration of time taken by requests handled by Vault core | ms | summary |
| `vault.core.handle_login_request` | Duration of time taken by login requests handled by Vault core | ms | summary | | `vault.core.handle_login_request` | Duration of time taken by login requests handled by Vault core | ms | summary |
| `vault.core.in_flight_requests` | Number of in-flight requests. | requests | gauge |
| `vault.core.leadership_setup_failed` | Duration of time taken by cluster leadership setup failures which have occurred in a highly available Vault cluster. This should be monitored and alerted on for overall cluster leadership status. | ms | summary | | `vault.core.leadership_setup_failed` | Duration of time taken by cluster leadership setup failures which have occurred in a highly available Vault cluster. This should be monitored and alerted on for overall cluster leadership status. | ms | summary |
| `vault.core.leadership_lost` | Duration of time taken by cluster leadership losses which have occurred in a highly available Vault cluster. This should be monitored and alerted on for overall cluster leadership status. | ms | summary | | `vault.core.leadership_lost` | Duration of time taken by cluster leadership losses which have occurred in a highly available Vault cluster. This should be monitored and alerted on for overall cluster leadership status. | ms | summary |
| `vault.core.license.expiration_time_epoch` | Time as epoch (seconds since Jan 1 1970) at which license will expire. | seconds | gauge | | `vault.core.license.expiration_time_epoch` | Time as epoch (seconds since Jan 1 1970) at which license will expire. | seconds | gauge |

View File

@ -397,6 +397,10 @@
"title": "<code>/sys/host-info</code>", "title": "<code>/sys/host-info</code>",
"path": "system/host-info" "path": "system/host-info"
}, },
{
"title": "<code>/sys/in-flight-req</code>",
"path": "system/in-flight-req"
},
{ {
"title": "<code>/sys/init</code>", "title": "<code>/sys/init</code>",
"path": "system/init" "path": "system/init"

View File

@ -366,6 +366,10 @@
"title": "<code>ui</code>", "title": "<code>ui</code>",
"path": "configuration/ui" "path": "configuration/ui"
}, },
{
"title": "<code>Log Completed Requests</code>",
"path": "configuration/log-requests-level"
},
{ {
"title": "<code>Entropy Augmentation</code> <sup>ENT</sup>", "title": "<code>Entropy Augmentation</code> <sup>ENT</sup>",
"path": "configuration/entropy-augmentation" "path": "configuration/entropy-augmentation"