Merge pull request #1501 from hashicorp/azure-permitpool
Add permitPool support to Azure
This commit is contained in:
commit
f7cfb66510
|
@ -7,11 +7,13 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Azure/azure-sdk-for-go/storage"
|
"github.com/Azure/azure-sdk-for-go/storage"
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
|
"github.com/hashicorp/errwrap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MaxBlobSize at this time
|
// MaxBlobSize at this time
|
||||||
|
@ -23,6 +25,7 @@ type AzureBackend struct {
|
||||||
container string
|
container string
|
||||||
client storage.BlobStorageClient
|
client storage.BlobStorageClient
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
|
permitPool *PermitPool
|
||||||
}
|
}
|
||||||
|
|
||||||
// newAzureBackend constructs an Azure backend using a pre-existing
|
// newAzureBackend constructs an Azure backend using a pre-existing
|
||||||
|
@ -62,10 +65,21 @@ func newAzureBackend(conf map[string]string, logger *log.Logger) (Backend, error
|
||||||
|
|
||||||
client.GetBlobService().CreateContainerIfNotExists(container, storage.ContainerAccessTypePrivate)
|
client.GetBlobService().CreateContainerIfNotExists(container, storage.ContainerAccessTypePrivate)
|
||||||
|
|
||||||
|
maxParStr, ok := conf["max_parallel"]
|
||||||
|
var maxParInt int
|
||||||
|
if ok {
|
||||||
|
maxParInt, err = strconv.Atoi(maxParStr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errwrap.Wrapf("failed parsing max_parallel parameter: {{err}}", err)
|
||||||
|
}
|
||||||
|
logger.Printf("[DEBUG]: azure: max_parallel set to %d", maxParInt)
|
||||||
|
}
|
||||||
|
|
||||||
a := &AzureBackend{
|
a := &AzureBackend{
|
||||||
container: container,
|
container: container,
|
||||||
client: client.GetBlobService(),
|
client: client.GetBlobService(),
|
||||||
logger: logger,
|
logger: logger,
|
||||||
|
permitPool: NewPermitPool(maxParInt),
|
||||||
}
|
}
|
||||||
return a, nil
|
return a, nil
|
||||||
}
|
}
|
||||||
|
@ -82,6 +96,9 @@ func (a *AzureBackend) Put(entry *Entry) error {
|
||||||
blocks := make([]storage.Block, 1)
|
blocks := make([]storage.Block, 1)
|
||||||
blocks[0] = storage.Block{ID: blockID, Status: storage.BlockStatusLatest}
|
blocks[0] = storage.Block{ID: blockID, Status: storage.BlockStatusLatest}
|
||||||
|
|
||||||
|
a.permitPool.Acquire()
|
||||||
|
defer a.permitPool.Release()
|
||||||
|
|
||||||
err := a.client.PutBlock(a.container, entry.Key, blockID, entry.Value)
|
err := a.client.PutBlock(a.container, entry.Key, blockID, entry.Value)
|
||||||
|
|
||||||
err = a.client.PutBlockList(a.container, entry.Key, blocks)
|
err = a.client.PutBlockList(a.container, entry.Key, blocks)
|
||||||
|
@ -92,6 +109,9 @@ func (a *AzureBackend) Put(entry *Entry) error {
|
||||||
func (a *AzureBackend) Get(key string) (*Entry, error) {
|
func (a *AzureBackend) Get(key string) (*Entry, error) {
|
||||||
defer metrics.MeasureSince([]string{"azure", "get"}, time.Now())
|
defer metrics.MeasureSince([]string{"azure", "get"}, time.Now())
|
||||||
|
|
||||||
|
a.permitPool.Acquire()
|
||||||
|
defer a.permitPool.Release()
|
||||||
|
|
||||||
exists, _ := a.client.BlobExists(a.container, key)
|
exists, _ := a.client.BlobExists(a.container, key)
|
||||||
|
|
||||||
if !exists {
|
if !exists {
|
||||||
|
@ -117,6 +137,10 @@ func (a *AzureBackend) Get(key string) (*Entry, error) {
|
||||||
// Delete is used to permanently delete an entry
|
// Delete is used to permanently delete an entry
|
||||||
func (a *AzureBackend) Delete(key string) error {
|
func (a *AzureBackend) Delete(key string) error {
|
||||||
defer metrics.MeasureSince([]string{"azure", "delete"}, time.Now())
|
defer metrics.MeasureSince([]string{"azure", "delete"}, time.Now())
|
||||||
|
|
||||||
|
a.permitPool.Acquire()
|
||||||
|
defer a.permitPool.Release()
|
||||||
|
|
||||||
_, err := a.client.DeleteBlobIfExists(a.container, key, nil)
|
_, err := a.client.DeleteBlobIfExists(a.container, key, nil)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -126,6 +150,9 @@ func (a *AzureBackend) Delete(key string) error {
|
||||||
func (a *AzureBackend) List(prefix string) ([]string, error) {
|
func (a *AzureBackend) List(prefix string) ([]string, error) {
|
||||||
defer metrics.MeasureSince([]string{"azure", "list"}, time.Now())
|
defer metrics.MeasureSince([]string{"azure", "list"}, time.Now())
|
||||||
|
|
||||||
|
a.permitPool.Acquire()
|
||||||
|
defer a.permitPool.Release()
|
||||||
|
|
||||||
list, err := a.client.ListBlobs(a.container, storage.ListBlobsParameters{Prefix: prefix})
|
list, err := a.client.ListBlobs(a.container, storage.ListBlobsParameters{Prefix: prefix})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -463,9 +463,13 @@ profile enabled. Vault will handle renewing profile credentials as they rotate.
|
||||||
#### Backend Reference: Azure (Community-Supported)
|
#### Backend Reference: Azure (Community-Supported)
|
||||||
|
|
||||||
* `accountName` (required) - The Azure Storage account name
|
* `accountName` (required) - The Azure Storage account name
|
||||||
|
|
||||||
* `accountKey` (required) - The Azure Storage account key
|
* `accountKey` (required) - The Azure Storage account key
|
||||||
|
|
||||||
* `container` (required) - The Azure Storage Blob container name
|
* `container` (required) - The Azure Storage Blob container name
|
||||||
|
|
||||||
|
* `max_parallel` (optional) - The maximum number of concurrent connections to Azure. Defaults to "128".
|
||||||
|
|
||||||
The current implementation is limited to a maximum of 4 MBytes per blob/file.
|
The current implementation is limited to a maximum of 4 MBytes per blob/file.
|
||||||
|
|
||||||
#### Backend Reference: MySQL (Community-Supported)
|
#### Backend Reference: MySQL (Community-Supported)
|
||||||
|
|
Loading…
Reference in a new issue