Plumb context through manta
This commit is contained in:
parent
2a8159c2ae
commit
4f984569fa
|
@ -32,7 +32,6 @@ type MantaBackend struct {
|
|||
}
|
||||
|
||||
func NewMantaBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) {
|
||||
|
||||
user := os.Getenv("MANTA_USER")
|
||||
if user == "" {
|
||||
user = conf["user"]
|
||||
|
@ -99,7 +98,7 @@ func NewMantaBackend(conf map[string]string, logger log.Logger) (physical.Backen
|
|||
}
|
||||
|
||||
// Put is used to insert or update an entry
|
||||
func (m *MantaBackend) Put(entry *physical.Entry) error {
|
||||
func (m *MantaBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
defer metrics.MeasureSince([]string{"manta", "put"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
|
@ -108,7 +107,7 @@ func (m *MantaBackend) Put(entry *physical.Entry) error {
|
|||
r := bytes.NewReader(entry.Value)
|
||||
r.Seek(0, 0)
|
||||
|
||||
return m.client.Objects().Put(context.Background(), &storage.PutObjectInput{
|
||||
return m.client.Objects().Put(ctx, &storage.PutObjectInput{
|
||||
ObjectPath: path.Join(mantaDefaultRootStore, m.directory, entry.Key, ".vault_value"),
|
||||
ObjectReader: r,
|
||||
ContentLength: uint64(len(entry.Value)),
|
||||
|
@ -117,13 +116,13 @@ func (m *MantaBackend) Put(entry *physical.Entry) error {
|
|||
}
|
||||
|
||||
// Get is used to fetch an entry
|
||||
func (m *MantaBackend) Get(key string) (*physical.Entry, error) {
|
||||
func (m *MantaBackend) Get(ctx context.Context, key string) (*physical.Entry, error) {
|
||||
defer metrics.MeasureSince([]string{"manta", "get"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
defer m.permitPool.Release()
|
||||
|
||||
output, err := m.client.Objects().Get(context.Background(), &storage.GetObjectInput{
|
||||
output, err := m.client.Objects().Get(ctx, &storage.GetObjectInput{
|
||||
ObjectPath: path.Join(mantaDefaultRootStore, m.directory, key, ".vault_value"),
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -150,14 +149,14 @@ func (m *MantaBackend) Get(key string) (*physical.Entry, error) {
|
|||
}
|
||||
|
||||
// Delete is used to permanently delete an entry
|
||||
func (m *MantaBackend) Delete(key string) error {
|
||||
func (m *MantaBackend) Delete(ctx context.Context, key string) error {
|
||||
defer metrics.MeasureSince([]string{"manta", "delete"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
defer m.permitPool.Release()
|
||||
|
||||
if strings.HasSuffix(key, "/") {
|
||||
err := m.client.Dir().Delete(context.Background(), &storage.DeleteDirectoryInput{
|
||||
err := m.client.Dir().Delete(ctx, &storage.DeleteDirectoryInput{
|
||||
DirectoryName: path.Join(mantaDefaultRootStore, m.directory, key),
|
||||
ForceDelete: true,
|
||||
})
|
||||
|
@ -165,7 +164,7 @@ func (m *MantaBackend) Delete(key string) error {
|
|||
return err
|
||||
}
|
||||
} else {
|
||||
err := m.client.Objects().Delete(context.Background(), &storage.DeleteObjectInput{
|
||||
err := m.client.Objects().Delete(ctx, &storage.DeleteObjectInput{
|
||||
ObjectPath: path.Join(mantaDefaultRootStore, m.directory, key, ".vault_value"),
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -175,14 +174,14 @@ func (m *MantaBackend) Delete(key string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return tryDeleteDirectory(m, path.Join(mantaDefaultRootStore, m.directory, key))
|
||||
return tryDeleteDirectory(ctx, m, path.Join(mantaDefaultRootStore, m.directory, key))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func tryDeleteDirectory(m *MantaBackend, directoryPath string) error {
|
||||
objs, err := m.client.Dir().List(context.Background(), &storage.ListDirectoryInput{
|
||||
func tryDeleteDirectory(ctx context.Context, m *MantaBackend, directoryPath string) error {
|
||||
objs, err := m.client.Dir().List(ctx, &storage.ListDirectoryInput{
|
||||
DirectoryName: directoryPath,
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -192,27 +191,27 @@ func tryDeleteDirectory(m *MantaBackend, directoryPath string) error {
|
|||
return err
|
||||
}
|
||||
if objs != nil && len(objs.Entries) == 0 {
|
||||
err := m.client.Dir().Delete(context.Background(), &storage.DeleteDirectoryInput{
|
||||
err := m.client.Dir().Delete(ctx, &storage.DeleteDirectoryInput{
|
||||
DirectoryName: directoryPath,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tryDeleteDirectory(m, path.Dir(directoryPath))
|
||||
return tryDeleteDirectory(ctx, m, path.Dir(directoryPath))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// List is used to list all the keys under a given
|
||||
// prefix, up to the next prefix.
|
||||
func (m *MantaBackend) List(prefix string) ([]string, error) {
|
||||
func (m *MantaBackend) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
defer metrics.MeasureSince([]string{"manta", "list"}, time.Now())
|
||||
|
||||
m.permitPool.Acquire()
|
||||
defer m.permitPool.Release()
|
||||
|
||||
objs, err := m.client.Dir().List(context.Background(), &storage.ListDirectoryInput{
|
||||
objs, err := m.client.Dir().List(ctx, &storage.ListDirectoryInput{
|
||||
DirectoryName: path.Join(mantaDefaultRootStore, m.directory, prefix),
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -225,7 +224,7 @@ func (m *MantaBackend) List(prefix string) ([]string, error) {
|
|||
keys := []string{}
|
||||
for _, obj := range objs.Entries {
|
||||
if obj.Type == "directory" {
|
||||
objs, err := m.client.Dir().List(context.Background(), &storage.ListDirectoryInput{
|
||||
objs, err := m.client.Dir().List(ctx, &storage.ListDirectoryInput{
|
||||
DirectoryName: path.Join(mantaDefaultRootStore, m.directory, prefix, obj.Name),
|
||||
})
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in a new issue