Add permit pool to dynamodb
This commit is contained in:
parent
638e61192a
commit
734e80ca56
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/dynamodb"
|
||||
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
|
||||
"github.com/hashicorp/errwrap"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -61,11 +62,12 @@ const (
|
|||
// a DynamoDB table. It can be run in high-availability mode
|
||||
// as DynamoDB has locking capabilities.
|
||||
type DynamoDBBackend struct {
|
||||
table string
|
||||
client *dynamodb.DynamoDB
|
||||
recovery bool
|
||||
logger *log.Logger
|
||||
haEnabled bool
|
||||
table string
|
||||
client *dynamodb.DynamoDB
|
||||
recovery bool
|
||||
logger *log.Logger
|
||||
haEnabled bool
|
||||
permitPool *PermitPool
|
||||
}
|
||||
|
||||
// DynamoDBRecord is the representation of a vault entry in
|
||||
|
@ -184,12 +186,23 @@ func newDynamoDBBackend(conf map[string]string, logger *log.Logger) (Backend, er
|
|||
}
|
||||
recoveryModeBool, _ := strconv.ParseBool(recoveryMode)
|
||||
|
||||
maxParStr, ok := conf["max_parallel"]
|
||||
var maxParInt int
|
||||
if ok {
|
||||
maxParInt, err = strconv.Atoi(maxParStr)
|
||||
if err != nil {
|
||||
return nil, errwrap.Wrapf("failed parsing max_parallel parameter: {{err}}", err)
|
||||
}
|
||||
logger.Printf("[DEBUG]: physical/consul: max_parallel set to %d", maxParInt)
|
||||
}
|
||||
|
||||
return &DynamoDBBackend{
|
||||
table: table,
|
||||
client: client,
|
||||
recovery: recoveryModeBool,
|
||||
haEnabled: haEnabledBool,
|
||||
logger: logger,
|
||||
table: table,
|
||||
client: client,
|
||||
permitPool: NewPermitPool(maxParInt),
|
||||
recovery: recoveryModeBool,
|
||||
haEnabled: haEnabledBool,
|
||||
logger: logger,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -235,6 +248,9 @@ func (d *DynamoDBBackend) Put(entry *Entry) error {
|
|||
func (d *DynamoDBBackend) Get(key string) (*Entry, error) {
|
||||
defer metrics.MeasureSince([]string{"dynamodb", "get"}, time.Now())
|
||||
|
||||
d.permitPool.Acquire()
|
||||
defer d.permitPool.Release()
|
||||
|
||||
resp, err := d.client.GetItem(&dynamodb.GetItemInput{
|
||||
TableName: aws.String(d.table),
|
||||
ConsistentRead: aws.Bool(true),
|
||||
|
@ -318,6 +334,10 @@ func (d *DynamoDBBackend) List(prefix string) ([]string, error) {
|
|||
},
|
||||
},
|
||||
}
|
||||
|
||||
d.permitPool.Acquire()
|
||||
defer d.permitPool.Release()
|
||||
|
||||
err := d.client.QueryPages(queryInput, func(out *dynamodb.QueryOutput, lastPage bool) bool {
|
||||
var record DynamoDBRecord
|
||||
for _, item := range out.Items {
|
||||
|
@ -357,11 +377,13 @@ func (d *DynamoDBBackend) batchWriteRequests(requests []*dynamodb.WriteRequest)
|
|||
batch := requests[:batchSize]
|
||||
requests = requests[batchSize:]
|
||||
|
||||
d.permitPool.Acquire()
|
||||
_, err := d.client.BatchWriteItem(&dynamodb.BatchWriteItemInput{
|
||||
RequestItems: map[string][]*dynamodb.WriteRequest{
|
||||
d.table: batch,
|
||||
},
|
||||
})
|
||||
d.permitPool.Release()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -265,8 +265,8 @@ For Consul, the following options are supported:
|
|||
|
||||
* `token` (optional) - An access token to use to write data to Consul.
|
||||
|
||||
* `max_parallel` (optional) - The maximum number of concurrent connections to Consul.
|
||||
Defaults to "128".
|
||||
* `max_parallel` (optional) - The maximum number of concurrent requests to Consul.
|
||||
Defaults to `"128"`.
|
||||
|
||||
* `tls_skip_verify` (optional) - If non-empty, then TLS host verification
|
||||
will be disabled for Consul communication. Defaults to false.
|
||||
|
@ -506,6 +506,9 @@ The DynamoDB backend has the following options:
|
|||
`AWS_DEFAULT_REGION` environment variable and will default to `us-east-1`
|
||||
if not specified.
|
||||
|
||||
* `max_parallel` (optional) - The maximum number of concurrent requests to
|
||||
DynamoDB. Defaults to `"128"`.
|
||||
|
||||
* `ha_enabled` (optional) - Setting this to `"1"`, `"t"`, or `"true"` will
|
||||
enable HA mode. Please ensure you have read the documentation for the
|
||||
`recovery_mode` option before enabling this. This option can also be
|
||||
|
@ -563,7 +566,7 @@ profile enabled. Vault will handle renewing profile credentials as they rotate.
|
|||
|
||||
* `container` (required) - The Azure Storage Blob container name
|
||||
|
||||
* `max_parallel` (optional) - The maximum number of concurrent connections to Azure. Defaults to "128".
|
||||
* `max_parallel` (optional) - The maximum number of concurrent requests to Azure. Defaults to `"128"`.
|
||||
|
||||
The current implementation is limited to a maximum of 4 MBytes per blob/file.
|
||||
|
||||
|
@ -581,7 +584,7 @@ For Swift, the following options are supported:
|
|||
|
||||
* `tenant` (optional) - The name of Tenant to use. It can be sourced from the `OS_TENANT_NAME` environment variable and will default to default tenant of for the username if not specified.
|
||||
|
||||
* `max_parallel` (optional) - The maximum number of concurrent connections to Swift. Defaults to "128".
|
||||
* `max_parallel` (optional) - The maximum number of concurrent requests to Swift. Defaults to `"128"`.
|
||||
|
||||
#### Backend Reference: MySQL (Community-Supported)
|
||||
|
||||
|
|
Loading…
Reference in New Issue