DynamoDB: Make Unlock key delete conditional on being old leader's (#6637)
This commit is contained in:
parent
cb3e1c1d19
commit
de885660e8
|
@ -174,7 +174,7 @@ func NewDynamoDBBackend(conf map[string]string, logger log.Logger) (physical.Bac
|
||||||
if dynamodbMaxRetryString == "" {
|
if dynamodbMaxRetryString == "" {
|
||||||
dynamodbMaxRetryString = conf["dynamodb_max_retries"]
|
dynamodbMaxRetryString = conf["dynamodb_max_retries"]
|
||||||
}
|
}
|
||||||
var dynamodbMaxRetry int = aws.UseServiceDefaultRetries
|
var dynamodbMaxRetry = aws.UseServiceDefaultRetries
|
||||||
if dynamodbMaxRetryString != "" {
|
if dynamodbMaxRetryString != "" {
|
||||||
var err error
|
var err error
|
||||||
dynamodbMaxRetry, err = strconv.Atoi(dynamodbMaxRetryString)
|
dynamodbMaxRetry, err = strconv.Atoi(dynamodbMaxRetryString)
|
||||||
|
@ -571,10 +571,31 @@ func (l *DynamoDBLock) Unlock() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
l.held = false
|
l.held = false
|
||||||
if err := l.backend.Delete(context.Background(), l.key); err != nil {
|
|
||||||
return err
|
// Conditionally delete after check that the key is actually this Vault's and
|
||||||
|
// not been already claimed by another leader
|
||||||
|
condition := "#identity = :identity"
|
||||||
|
deleteMyLock := &dynamodb.DeleteItemInput{
|
||||||
|
TableName: &l.backend.table,
|
||||||
|
ConditionExpression: &condition,
|
||||||
|
Key: map[string]*dynamodb.AttributeValue{
|
||||||
|
"Path": {S: aws.String(recordPathForVaultKey(l.key))},
|
||||||
|
"Key": {S: aws.String(recordKeyForVaultKey(l.key))},
|
||||||
|
},
|
||||||
|
ExpressionAttributeNames: map[string]*string{
|
||||||
|
"#identity": aws.String("Identity"),
|
||||||
|
},
|
||||||
|
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
|
||||||
|
":identity": {B: []byte(l.identity)},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
|
_, err := l.backend.client.DeleteItem(deleteMyLock)
|
||||||
|
if isConditionCheckFailed(err) {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Value checks whether or not the lock is held by any instance of DynamoDBLock,
|
// Value checks whether or not the lock is held by any instance of DynamoDBLock,
|
||||||
|
@ -612,7 +633,7 @@ func (l *DynamoDBLock) tryToLock(stop, success chan struct{}, errors chan error)
|
||||||
if err, ok := err.(awserr.Error); ok {
|
if err, ok := err.(awserr.Error); ok {
|
||||||
// Don't report a condition check failure, this means that the lock
|
// Don't report a condition check failure, this means that the lock
|
||||||
// is already being held.
|
// is already being held.
|
||||||
if err.Code() != dynamodb.ErrCodeConditionalCheckFailedException {
|
if !isConditionCheckFailed(err) {
|
||||||
errors <- err
|
errors <- err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -634,7 +655,12 @@ func (l *DynamoDBLock) periodicallyRenewLock(done chan struct{}) {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
// This should not renew the lock if the lock was deleted from under you.
|
// This should not renew the lock if the lock was deleted from under you.
|
||||||
l.updateItem(false)
|
err := l.updateItem(false)
|
||||||
|
if err != nil {
|
||||||
|
if !isConditionCheckFailed(err) {
|
||||||
|
l.backend.logger.Error("error renewing leadership lock", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
case <-done:
|
case <-done:
|
||||||
ticker.Stop()
|
ticker.Stop()
|
||||||
return
|
return
|
||||||
|
@ -665,8 +691,8 @@ func (l *DynamoDBLock) updateItem(createIfMissing bool) error {
|
||||||
_, err := l.backend.client.UpdateItem(&dynamodb.UpdateItemInput{
|
_, err := l.backend.client.UpdateItem(&dynamodb.UpdateItemInput{
|
||||||
TableName: aws.String(l.backend.table),
|
TableName: aws.String(l.backend.table),
|
||||||
Key: map[string]*dynamodb.AttributeValue{
|
Key: map[string]*dynamodb.AttributeValue{
|
||||||
"Path": &dynamodb.AttributeValue{S: aws.String(recordPathForVaultKey(l.key))},
|
"Path": {S: aws.String(recordPathForVaultKey(l.key))},
|
||||||
"Key": &dynamodb.AttributeValue{S: aws.String(recordKeyForVaultKey(l.key))},
|
"Key": {S: aws.String(recordKeyForVaultKey(l.key))},
|
||||||
},
|
},
|
||||||
UpdateExpression: aws.String("SET #value=:value, #identity=:identity, #expires=:expires"),
|
UpdateExpression: aws.String("SET #value=:value, #identity=:identity, #expires=:expires"),
|
||||||
// If both key and path already exist, we can only write if
|
// If both key and path already exist, we can only write if
|
||||||
|
@ -682,12 +708,13 @@ func (l *DynamoDBLock) updateItem(createIfMissing bool) error {
|
||||||
"#value": aws.String("Value"),
|
"#value": aws.String("Value"),
|
||||||
},
|
},
|
||||||
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
|
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
|
||||||
":identity": &dynamodb.AttributeValue{B: []byte(l.identity)},
|
":identity": {B: []byte(l.identity)},
|
||||||
":value": &dynamodb.AttributeValue{B: []byte(l.value)},
|
":value": {B: []byte(l.value)},
|
||||||
":now": &dynamodb.AttributeValue{N: aws.String(strconv.FormatInt(now.UnixNano(), 10))},
|
":now": {N: aws.String(strconv.FormatInt(now.UnixNano(), 10))},
|
||||||
":expires": &dynamodb.AttributeValue{N: aws.String(strconv.FormatInt(now.Add(l.ttl).UnixNano(), 10))},
|
":expires": {N: aws.String(strconv.FormatInt(now.Add(l.ttl).UnixNano(), 10))},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -831,3 +858,15 @@ func unescapeEmptyPath(s string) string {
|
||||||
}
|
}
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isConditionCheckFailed tests whether err is an ErrCodeConditionalCheckFailedException
|
||||||
|
// from the AWS SDK.
|
||||||
|
func isConditionCheckFailed(err error) bool {
|
||||||
|
if err != nil {
|
||||||
|
if err, ok := err.(awserr.Error); ok {
|
||||||
|
return err.Code() == dynamodb.ErrCodeConditionalCheckFailedException
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
|
@ -576,7 +576,9 @@ func (c *Core) waitForLeadership(newLeaderCh chan func(), manualStepDownCh, stop
|
||||||
c.logger.Error("clearing leader advertisement failed", "error", err)
|
c.logger.Error("clearing leader advertisement failed", "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.heldHALock.Unlock()
|
if err := c.heldHALock.Unlock(); err != nil {
|
||||||
|
c.logger.Error("unlocking HA lock failed", "error", err)
|
||||||
|
}
|
||||||
c.heldHALock = nil
|
c.heldHALock = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue