Add code for writing and reading request counters to storage. (#5918)

Increment a counter whenever a request is received. 
The in-memory counter is persisted to counters/requests/YYYY/MM.
When the month wraps around, we reset the in-memory counter to
zero.
Add an endpoint for querying the request counters across all time.
This commit is contained in:
ncabatoff 2019-03-05 14:55:07 -05:00 committed by GitHub
parent d8f39d54c9
commit cd747c9318
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 390 additions and 13 deletions

View File

@ -423,6 +423,9 @@ type Core struct {
// Telemetry objects
metricsHelper *metricsutil.MetricsHelper
// Stores request counters
counters counters
}
// CoreConfig is used to parameterize a core
@ -494,6 +497,8 @@ type CoreConfig struct {
// Telemetry objects
MetricsHelper *metricsutil.MetricsHelper
CounterSyncInterval time.Duration
}
func (c *CoreConfig) Clone() *CoreConfig {
@ -526,6 +531,7 @@ func (c *CoreConfig) Clone() *CoreConfig {
DisablePerformanceStandby: c.DisablePerformanceStandby,
DisableIndexing: c.DisableIndexing,
AllLoggers: c.AllLoggers,
CounterSyncInterval: c.CounterSyncInterval,
}
}
@ -564,6 +570,11 @@ func NewCore(conf *CoreConfig) (*Core, error) {
conf.Logger = logging.NewVaultLogger(log.Trace)
}
syncInterval := conf.CounterSyncInterval
if syncInterval.Nanoseconds() == 0 {
syncInterval = 30 * time.Second
}
// Setup the core
c := &Core{
entCore: entCore{},
@ -600,6 +611,10 @@ func NewCore(conf *CoreConfig) (*Core, error) {
neverBecomeActive: new(uint32),
clusterLeaderParams: new(atomic.Value),
metricsHelper: conf.MetricsHelper,
counters: counters{
requests: new(uint64),
syncInterval: syncInterval,
},
}
atomic.StoreUint32(c.sealed, 1)
@ -1459,6 +1474,9 @@ func (s standardUnsealStrategy) unseal(ctx context.Context, logger log.Logger, c
if err := c.loadCORSConfig(ctx); err != nil {
return err
}
if err := c.loadCurrentRequestCounters(ctx, time.Now()); err != nil {
return err
}
if err := c.loadCredentials(ctx); err != nil {
return err
}
@ -1631,14 +1649,28 @@ func stopReplicationImpl(c *Core) error {
// emitMetrics is used to periodically expose metrics while running
func (c *Core) emitMetrics(stopCh chan struct{}) {
emitTimer := time.Tick(time.Second)
writeTimer := time.Tick(c.counters.syncInterval)
for {
select {
case <-time.After(time.Second):
case <-emitTimer:
c.metricsMutex.Lock()
if c.expiration != nil {
c.expiration.emitMetrics()
}
c.metricsMutex.Unlock()
case <-writeTimer:
if c.perfStandby {
syncCounter(c)
} else {
err := c.saveCurrentRequestCounters(context.Background(), time.Now())
if err != nil {
c.logger.Error("writing request counters to barrier", "err", err)
}
}
case <-stopCh:
return
}

175
vault/counters.go Normal file
View File

@ -0,0 +1,175 @@
package vault
import (
"context"
"sort"
"sync/atomic"
"time"
"github.com/hashicorp/errwrap"
"github.com/hashicorp/vault/logical"
)
const (
requestCounterDatePathFormat = "2006/01"
countersPath = systemBarrierPrefix + "counters"
requestCountersRelPath = "counters/requests/"
)
type counters struct {
// requests counts requests seen by Vault this month; does not include requests
// excluded by design, e.g. health checks and UI asset requests.
requests *uint64
// activePath is set at startup to the path we primed the requests counter from,
// or empty string if there wasn't a relevant path - either because this is the first
// time Vault starts with the feature enabled, or because Vault hadn't written
// out the request counter this month yet.
// Whenever we write out the counters, we update activePath if it's no longer
// accurate. This coincides with a reset of the counters.
// There's no lock because the only reader/writer of activePath is the goroutine
// doing background syncs.
activePath string
// syncInterval determines how often the counters get written to storage (on primary)
// or synced to primary.
syncInterval time.Duration
}
// RequestCounter stores the state of request counters for a single unspecified period.
type RequestCounter struct {
// Total is the number of requests seen during a given period.
Total *uint64 `json:"total"`
}
// DatedRequestCounter holds request counters from a single period of time.
type DatedRequestCounter struct {
// StartTime is when the period starts.
StartTime time.Time `json:"start_time"`
// RequestCounter counts requests.
RequestCounter
}
// loadAllRequestCounters returns all request counters found in storage,
// ordered by time (oldest first.)
func (c *Core) loadAllRequestCounters(ctx context.Context, now time.Time) ([]DatedRequestCounter, error) {
view := c.systemBarrierView.SubView(requestCountersRelPath)
datepaths, err := view.List(ctx, "")
if err != nil {
return nil, errwrap.Wrapf("failed to read request counters: {{err}}", err)
}
var all []DatedRequestCounter
sort.Strings(datepaths)
for _, datepath := range datepaths {
datesubpaths, err := view.List(ctx, datepath)
if err != nil {
return nil, errwrap.Wrapf("failed to read request counters: {{err}}", err)
}
sort.Strings(datesubpaths)
for _, datesubpath := range datesubpaths {
fullpath := datepath + datesubpath
counter, err := c.loadRequestCounters(ctx, fullpath)
if err != nil {
return nil, err
}
t, err := time.Parse(requestCounterDatePathFormat, fullpath)
if err != nil {
return nil, err
}
all = append(all, DatedRequestCounter{StartTime: t, RequestCounter: *counter})
}
}
start, _ := time.Parse(requestCounterDatePathFormat, now.Format(requestCounterDatePathFormat))
idx := sort.Search(len(all), func(i int) bool {
return !all[i].StartTime.Before(start)
})
cur := atomic.LoadUint64(c.counters.requests)
if idx < len(all) {
all[idx].RequestCounter.Total = &cur
} else {
all = append(all, DatedRequestCounter{StartTime: start, RequestCounter: RequestCounter{Total: &cur}})
}
return all, nil
}
// loadCurrentRequestCounters reads the current RequestCounter out of storage.
// The in-memory current request counter is populated with the value read, if any.
// now should be the current time; it is a parameter to facilitate testing.
func (c *Core) loadCurrentRequestCounters(ctx context.Context, now time.Time) error {
datepath := now.Format(requestCounterDatePathFormat)
counter, err := c.loadRequestCounters(ctx, datepath)
if err != nil {
return err
}
if counter != nil {
c.counters.activePath = datepath
atomic.StoreUint64(c.counters.requests, *counter.Total)
}
return nil
}
// loadRequestCounters reads a RequestCounter out of storage at location datepath.
// If nothing is found at that path, that isn't an error: a reference to a zero
// RequestCounter is returned.
func (c *Core) loadRequestCounters(ctx context.Context, datepath string) (*RequestCounter, error) {
view := c.systemBarrierView.SubView(requestCountersRelPath)
out, err := view.Get(ctx, datepath)
if err != nil {
return nil, errwrap.Wrapf("failed to read request counters: {{err}}", err)
}
if out == nil {
return nil, nil
}
newCounters := &RequestCounter{}
err = out.DecodeJSON(newCounters)
if err != nil {
return nil, err
}
return newCounters, nil
}
// saveCurrentRequestCounters writes the current RequestCounter to storage.
// The in-memory current request counter is reset to zero after writing if
// we've entered a new month.
// now should be the current time; it is a parameter to facilitate testing.
func (c *Core) saveCurrentRequestCounters(ctx context.Context, now time.Time) error {
view := c.systemBarrierView.SubView(requestCountersRelPath)
requests := atomic.LoadUint64(c.counters.requests)
curDatePath := now.Format(requestCounterDatePathFormat)
// If activePath is empty string, we were started with nothing in storage
// for the current month, so we should not reset the in-mem counter.
// But if activePath is nonempty and not curDatePath, we should reset.
shouldReset, writeDatePath := false, curDatePath
if c.counters.activePath != "" && c.counters.activePath != curDatePath {
shouldReset, writeDatePath = true, c.counters.activePath
}
localCounters := &RequestCounter{
Total: &requests,
}
entry, err := logical.StorageEntryJSON(writeDatePath, localCounters)
if err != nil {
return errwrap.Wrapf("failed to create request counters entry: {{err}}", err)
}
if err := view.Put(ctx, entry); err != nil {
return errwrap.Wrapf("failed to save request counters: {{err}}", err)
}
if shouldReset {
atomic.StoreUint64(c.counters.requests, 0)
}
if c.counters.activePath != curDatePath {
c.counters.activePath = curDatePath
}
return nil
}

122
vault/counters_test.go Normal file
View File

@ -0,0 +1,122 @@
package vault
import (
"context"
"sync/atomic"
"testing"
"time"
"github.com/go-test/deep"
)
//noinspection SpellCheckingInspection
func testParseTime(t *testing.T, format, timeval string) time.Time {
t.Helper()
tm, err := time.Parse(format, timeval)
if err != nil {
t.Fatalf("Error parsing time '%s': %v", timeval, err)
}
return tm
}
// TestRequestCounterLoadCurrent exercises the code that primes the in-mem
// request counters from persistent storage.
func TestRequestCounterLoadCurrent(t *testing.T) {
c, _, _ := TestCoreUnsealed(t)
december2018 := testParseTime(t, time.RFC3339, "2018-12-05T09:44:12-05:00")
decemberRequests := uint64(555)
// It's December, and we got some requests. Persist the counter.
atomic.StoreUint64(c.counters.requests, decemberRequests)
err := c.saveCurrentRequestCounters(context.Background(), december2018)
if err != nil {
t.Fatal(err)
}
// It's still December, simulate being restarted. At startup the counter is
// zero initially, until we read the counter from storage post-unseal via
// loadCurrentRequestCounters.
atomic.StoreUint64(c.counters.requests, 0)
err = c.loadCurrentRequestCounters(context.Background(), december2018)
if err != nil {
t.Fatal(err)
}
if got := atomic.LoadUint64(c.counters.requests); got != decemberRequests {
t.Fatalf("expected=%d, got=%d", decemberRequests, got)
}
// Now simulate being restarted in January. We never wrote anything out during
// January, so the in-mem counter should remain zero.
january2019 := testParseTime(t, time.RFC3339, "2019-01-02T08:21:11-05:00")
atomic.StoreUint64(c.counters.requests, 0)
err = c.loadCurrentRequestCounters(context.Background(), january2019)
if err != nil {
t.Fatal(err)
}
if got := atomic.LoadUint64(c.counters.requests); got != 0 {
t.Fatalf("expected=%d, got=%d", 0, got)
}
}
// TestRequestCounterSaveCurrent exercises the code that saves the in-mem
// request counters to persistent storage.
func TestRequestCounterSaveCurrent(t *testing.T) {
c, _, _ := TestCoreUnsealed(t)
// storeSaveLoad stores newValue in the in-mem counter, saves it to storage,
// then verifies in-mem counter has value expectedPostLoad.
storeSaveLoad := func(newValue, expectedPostLoad uint64, now time.Time) {
t.Helper()
atomic.StoreUint64(c.counters.requests, newValue)
err := c.saveCurrentRequestCounters(context.Background(), now)
if err != nil {
t.Fatal(err)
}
if got := atomic.LoadUint64(c.counters.requests); got != expectedPostLoad {
t.Fatalf("expected=%d, got=%d", expectedPostLoad, got)
}
}
// Start in December. The first write ever should persist the current in-mem value.
december2018 := testParseTime(t, time.RFC3339, "2018-12-05T09:44:12-05:00")
decemberRequests := uint64(555)
storeSaveLoad(decemberRequests, decemberRequests, december2018)
// Update request count.
decemberRequests++
storeSaveLoad(decemberRequests, decemberRequests, december2018)
decemberStartTime := testParseTime(t, requestCounterDatePathFormat, december2018.Format(requestCounterDatePathFormat))
expected2018 := []DatedRequestCounter{
{StartTime: decemberStartTime, RequestCounter: RequestCounter{Total: &decemberRequests}},
}
all, err := c.loadAllRequestCounters(context.Background(), december2018)
if err != nil {
t.Fatal(err)
}
if diff := deep.Equal(all, expected2018); len(diff) != 0 {
t.Errorf("Expected=%v, got=%v, diff=%v", expected2018, all, diff)
}
// Now it's January. Saving after transition to new month should reset in-mem
// counter to zero, and also write zero to storage for the new month.
january2019 := testParseTime(t, time.RFC3339, "2019-01-02T08:21:11-05:00")
decemberRequests += 5
storeSaveLoad(decemberRequests, 0, january2019)
januaryRequests := uint64(333)
storeSaveLoad(januaryRequests, januaryRequests, january2019)
all, err = c.loadAllRequestCounters(context.Background(), january2019)
if err != nil {
t.Fatal(err)
}
januaryStartTime := testParseTime(t, requestCounterDatePathFormat, january2019.Format(requestCounterDatePathFormat))
expected2019 := expected2018
expected2019 = append(expected2019,
DatedRequestCounter{januaryStartTime, RequestCounter{&januaryRequests}})
if diff := deep.Equal(all, expected2019); len(diff) != 0 {
t.Errorf("Expected=%v, got=%v, diff=%v", expected2019, all, diff)
}
}

View File

@ -2984,6 +2984,21 @@ func (b *SystemBackend) pathInternalUIMountRead(ctx context.Context, req *logica
return resp, nil
}
func (b *SystemBackend) pathInternalCountersRequests(ctx context.Context, req *logical.Request, d *framework.FieldData) (*logical.Response, error) {
counters, err := b.Core.loadAllRequestCounters(ctx, time.Now())
if err != nil {
return nil, err
}
resp := &logical.Response{
Data: map[string]interface{}{
"counters": counters,
},
}
return resp, nil
}
func (b *SystemBackend) pathInternalUIResultantACL(ctx context.Context, req *logical.Request, d *framework.FieldData) (*logical.Response, error) {
if req.ClientToken == "" {
// 204 -- no ACL
@ -3898,4 +3913,8 @@ This path responds to the following HTTP methods.
"Export the metrics aggregated for telemetry purpose.",
"",
},
"internal-counters-requests": {
"Count of requests seen by this Vault cluster over time.",
"Count of requests seen by this Vault cluster over time. Not included in count: health checks, UI asset requests, requests forwarded from another cluster.",
},
}

View File

@ -824,6 +824,17 @@ func (b *SystemBackend) internalPaths() []*framework.Path {
HelpSynopsis: strings.TrimSpace(sysHelp["internal-ui-resultant-acl"][0]),
HelpDescription: strings.TrimSpace(sysHelp["internal-ui-resultant-acl"][1]),
},
{
Pattern: "internal/counters/requests",
Operations: map[logical.Operation]framework.OperationHandler{
logical.ReadOperation: &framework.PathOperation{
Callback: b.pathInternalCountersRequests,
Unpublished: true,
},
},
HelpSynopsis: strings.TrimSpace(sysHelp["internal-counters-requests"][0]),
HelpDescription: strings.TrimSpace(sysHelp["internal-counters-requests"][1]),
},
}
}

View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"
"sync/atomic"
"time"
metrics "github.com/armon/go-metrics"
@ -529,6 +530,18 @@ func isControlGroupRun(req *logical.Request) bool {
return req.ControlGroup != nil
}
func (c *Core) doRouting(ctx context.Context, req *logical.Request) (*logical.Response, error) {
// If we're replicating and we get a read-only error from a backend, need to forward to primary
resp, err := c.router.Route(ctx, req)
if err != nil {
if shouldForward(c, err) {
return forward(ctx, c, req)
}
}
atomic.AddUint64(c.counters.requests, 1)
return resp, err
}
func (c *Core) handleRequest(ctx context.Context, req *logical.Request) (retResp *logical.Response, retAuth *logical.Auth, retErr error) {
defer metrics.MeasureSince([]string{"core", "handle_request"}, time.Now())
@ -662,12 +675,9 @@ func (c *Core) handleRequest(ctx context.Context, req *logical.Request) (retResp
}
// Route the request
resp, routeErr := c.router.Route(ctx, req)
// If we're replicating and we get a read-only error from a backend, need to forward to primary
if routeErr != nil {
resp, routeErr = possiblyForward(ctx, c, req, resp, routeErr)
}
resp, routeErr := c.doRouting(ctx, req)
if resp != nil {
// If wrapping is used, use the shortest between the request and response
var wrapTTL time.Duration
var wrapFormat, creationPath string
@ -943,11 +953,7 @@ func (c *Core) handleLoginRequest(ctx context.Context, req *logical.Request) (re
}
// Route the request
resp, routeErr := c.router.Route(ctx, req)
// If we're replicating and we get a read-only error from a backend, need to forward to primary
if routeErr != nil {
resp, routeErr = possiblyForward(ctx, c, req, resp, routeErr)
}
resp, routeErr := c.doRouting(ctx, req)
if resp != nil {
// If wrapping is used, use the shortest between the request and response
var wrapTTL time.Duration

View File

@ -15,8 +15,19 @@ func checkNeedsCG(context.Context, *Core, *logical.Request, *logical.Auth, error
return nil, nil, nil, nil
}
func possiblyForward(ctx context.Context, c *Core, req *logical.Request, resp *logical.Response, routeErr error) (*logical.Response, error) {
return resp, routeErr
func shouldForward(c *Core, routeErr error) bool {
return false
}
func syncCounter(c *Core) {
}
func couldForward(c *Core) bool {
return false
}
func forward(ctx context.Context, c *Core, req *logical.Request) (*logical.Response, error) {
panic("forward called in OSS Vault")
}
func getLeaseRegisterFunc(c *Core) (func(context.Context, *logical.Request, *logical.Response) (string, error), error) {

View File

@ -1292,6 +1292,7 @@ func NewTestCluster(t testing.T, base *CoreConfig, opts *TestClusterOptions) *Te
coreConfig.DisableCache = base.DisableCache
coreConfig.DevToken = base.DevToken
coreConfig.CounterSyncInterval = base.CounterSyncInterval
}
if coreConfig.Physical == nil {