From 734e80ca560730b3319c7ce56faf1c17549f177f Mon Sep 17 00:00:00 2001 From: Jeff Mitchell Date: Mon, 15 Aug 2016 19:45:06 -0400 Subject: [PATCH] Add permit pool to dynamodb --- physical/dynamodb.go | 42 ++++++++++++++++++------ website/source/docs/config/index.html.md | 11 ++++--- 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/physical/dynamodb.go b/physical/dynamodb.go index e11d99ba2..bb7b42ff0 100644 --- a/physical/dynamodb.go +++ b/physical/dynamodb.go @@ -21,6 +21,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" + "github.com/hashicorp/errwrap" ) const ( @@ -61,11 +62,12 @@ const ( // a DynamoDB table. It can be run in high-availability mode // as DynamoDB has locking capabilities. type DynamoDBBackend struct { - table string - client *dynamodb.DynamoDB - recovery bool - logger *log.Logger - haEnabled bool + table string + client *dynamodb.DynamoDB + recovery bool + logger *log.Logger + haEnabled bool + permitPool *PermitPool } // DynamoDBRecord is the representation of a vault entry in @@ -184,12 +186,23 @@ func newDynamoDBBackend(conf map[string]string, logger *log.Logger) (Backend, er } recoveryModeBool, _ := strconv.ParseBool(recoveryMode) + maxParStr, ok := conf["max_parallel"] + var maxParInt int + if ok { + maxParInt, err = strconv.Atoi(maxParStr) + if err != nil { + return nil, errwrap.Wrapf("failed parsing max_parallel parameter: {{err}}", err) + } + logger.Printf("[DEBUG]: physical/consul: max_parallel set to %d", maxParInt) + } + return &DynamoDBBackend{ - table: table, - client: client, - recovery: recoveryModeBool, - haEnabled: haEnabledBool, - logger: logger, + table: table, + client: client, + permitPool: NewPermitPool(maxParInt), + recovery: recoveryModeBool, + haEnabled: haEnabledBool, + logger: logger, }, nil } @@ -235,6 +248,9 @@ func (d *DynamoDBBackend) Put(entry *Entry) error { func (d *DynamoDBBackend) Get(key string) (*Entry, error) { defer metrics.MeasureSince([]string{"dynamodb", "get"}, time.Now()) + d.permitPool.Acquire() + defer d.permitPool.Release() + resp, err := d.client.GetItem(&dynamodb.GetItemInput{ TableName: aws.String(d.table), ConsistentRead: aws.Bool(true), @@ -318,6 +334,10 @@ func (d *DynamoDBBackend) List(prefix string) ([]string, error) { }, }, } + + d.permitPool.Acquire() + defer d.permitPool.Release() + err := d.client.QueryPages(queryInput, func(out *dynamodb.QueryOutput, lastPage bool) bool { var record DynamoDBRecord for _, item := range out.Items { @@ -357,11 +377,13 @@ func (d *DynamoDBBackend) batchWriteRequests(requests []*dynamodb.WriteRequest) batch := requests[:batchSize] requests = requests[batchSize:] + d.permitPool.Acquire() _, err := d.client.BatchWriteItem(&dynamodb.BatchWriteItemInput{ RequestItems: map[string][]*dynamodb.WriteRequest{ d.table: batch, }, }) + d.permitPool.Release() if err != nil { return err } diff --git a/website/source/docs/config/index.html.md b/website/source/docs/config/index.html.md index d37ca67c0..189609820 100644 --- a/website/source/docs/config/index.html.md +++ b/website/source/docs/config/index.html.md @@ -265,8 +265,8 @@ For Consul, the following options are supported: * `token` (optional) - An access token to use to write data to Consul. - * `max_parallel` (optional) - The maximum number of concurrent connections to Consul. - Defaults to "128". + * `max_parallel` (optional) - The maximum number of concurrent requests to Consul. + Defaults to `"128"`. * `tls_skip_verify` (optional) - If non-empty, then TLS host verification will be disabled for Consul communication. Defaults to false. @@ -506,6 +506,9 @@ The DynamoDB backend has the following options: `AWS_DEFAULT_REGION` environment variable and will default to `us-east-1` if not specified. + * `max_parallel` (optional) - The maximum number of concurrent requests to + DynamoDB. Defaults to `"128"`. + * `ha_enabled` (optional) - Setting this to `"1"`, `"t"`, or `"true"` will enable HA mode. Please ensure you have read the documentation for the `recovery_mode` option before enabling this. This option can also be @@ -563,7 +566,7 @@ profile enabled. Vault will handle renewing profile credentials as they rotate. * `container` (required) - The Azure Storage Blob container name - * `max_parallel` (optional) - The maximum number of concurrent connections to Azure. Defaults to "128". + * `max_parallel` (optional) - The maximum number of concurrent requests to Azure. Defaults to `"128"`. The current implementation is limited to a maximum of 4 MBytes per blob/file. @@ -581,7 +584,7 @@ For Swift, the following options are supported: * `tenant` (optional) - The name of Tenant to use. It can be sourced from the `OS_TENANT_NAME` environment variable and will default to default tenant of for the username if not specified. - * `max_parallel` (optional) - The maximum number of concurrent connections to Swift. Defaults to "128". + * `max_parallel` (optional) - The maximum number of concurrent requests to Swift. Defaults to `"128"`. #### Backend Reference: MySQL (Community-Supported)