physical/spanner: use separate client for updating locks (#9423)
* physical/spanner: use separate client for updating locks We believe this mitigates an issue where a large influx of requests cause the leader to be unable to update the lock table (since it cannot grab a client from the pool or the client has no more open connections), which causes cascading failure.
This commit is contained in:
parent
95a2d61651
commit
d00adf89c9
|
@ -79,18 +79,26 @@ type Backend struct {
|
|||
// table is the name of the table in the database.
|
||||
table string
|
||||
|
||||
// client is the API client and permitPool is the allowed concurrent uses of
|
||||
// the client.
|
||||
client *spanner.Client
|
||||
permitPool *physical.PermitPool
|
||||
|
||||
// haTable is the name of the table to use for HA in the database.
|
||||
haTable string
|
||||
|
||||
// haEnabled indicates if high availability is enabled. Default: true.
|
||||
haEnabled bool
|
||||
|
||||
// client is the underlying API client for talking to spanner.
|
||||
client *spanner.Client
|
||||
// haClient is the API client. This is managed separately from the main client
|
||||
// because a flood of requests should not block refreshing the TTLs on the
|
||||
// lock.
|
||||
//
|
||||
// This value will be nil if haEnabled is false.
|
||||
haClient *spanner.Client
|
||||
|
||||
// logger and permitPool are internal constructs.
|
||||
logger log.Logger
|
||||
permitPool *physical.PermitPool
|
||||
// logger is the internal logger.
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// NewBackend creates a new Google Spanner storage backend with the given
|
||||
|
@ -127,6 +135,7 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
|
|||
}
|
||||
|
||||
// HA configuration
|
||||
haClient := (*spanner.Client)(nil)
|
||||
haEnabled := false
|
||||
haEnabledStr := os.Getenv(envHAEnabled)
|
||||
if haEnabledStr == "" {
|
||||
|
@ -139,6 +148,17 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
|
|||
return nil, errwrap.Wrapf("failed to parse HA enabled: {{err}}", err)
|
||||
}
|
||||
}
|
||||
if haEnabled {
|
||||
logger.Debug("creating HA client")
|
||||
var err error
|
||||
ctx := context.Background()
|
||||
haClient, err = spanner.NewClient(ctx, database,
|
||||
option.WithUserAgent(useragent.String()),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errwrap.Wrapf("failed to create HA client: {{err}}", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Max parallel
|
||||
maxParallel, err := extractInt(c["max_parallel"])
|
||||
|
@ -153,8 +173,8 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
|
|||
"haTable", haTable,
|
||||
"maxParallel", maxParallel,
|
||||
)
|
||||
logger.Debug("creating client")
|
||||
|
||||
logger.Debug("creating client")
|
||||
ctx := context.Background()
|
||||
client, err := spanner.NewClient(ctx, database,
|
||||
option.WithUserAgent(useragent.String()),
|
||||
|
@ -164,14 +184,16 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
|
|||
}
|
||||
|
||||
return &Backend{
|
||||
database: database,
|
||||
table: table,
|
||||
haEnabled: haEnabled,
|
||||
haTable: haTable,
|
||||
|
||||
database: database,
|
||||
table: table,
|
||||
client: client,
|
||||
permitPool: physical.NewPermitPool(maxParallel),
|
||||
logger: logger,
|
||||
|
||||
haEnabled: haEnabled,
|
||||
haTable: haTable,
|
||||
haClient: haClient,
|
||||
|
||||
logger: logger,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -175,13 +175,9 @@ func (l *Lock) Unlock() error {
|
|||
}
|
||||
l.stopLock.Unlock()
|
||||
|
||||
// Pooling
|
||||
l.backend.permitPool.Acquire()
|
||||
defer l.backend.permitPool.Release()
|
||||
|
||||
// Delete
|
||||
ctx := context.Background()
|
||||
if _, err := l.backend.client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
|
||||
if _, err := l.backend.haClient.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
|
||||
row, err := txn.ReadRow(ctx, l.backend.haTable, spanner.Key{l.key}, []string{"Identity"})
|
||||
if err != nil {
|
||||
if spanner.ErrCode(err) != codes.NotFound {
|
||||
|
@ -327,10 +323,6 @@ OUTER:
|
|||
// - if key is empty or identity is the same or timestamp exceeds TTL
|
||||
// - update the lock to self
|
||||
func (l *Lock) writeLock() (bool, error) {
|
||||
// Pooling
|
||||
l.backend.permitPool.Acquire()
|
||||
defer l.backend.permitPool.Release()
|
||||
|
||||
// Keep track of whether the lock was written
|
||||
lockWritten := false
|
||||
|
||||
|
@ -349,7 +341,7 @@ func (l *Lock) writeLock() (bool, error) {
|
|||
}
|
||||
}()
|
||||
|
||||
_, err := l.backend.client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
|
||||
_, err := l.backend.haClient.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
|
||||
row, err := txn.ReadRow(ctx, l.backend.haTable, spanner.Key{l.key}, []string{"Key", "Identity", "Timestamp"})
|
||||
if err != nil && spanner.ErrCode(err) != codes.NotFound {
|
||||
return err
|
||||
|
@ -396,12 +388,8 @@ func (l *Lock) writeLock() (bool, error) {
|
|||
|
||||
// get retrieves the value for the lock.
|
||||
func (l *Lock) get(ctx context.Context) (*LockRecord, error) {
|
||||
// Pooling
|
||||
l.backend.permitPool.Acquire()
|
||||
defer l.backend.permitPool.Release()
|
||||
|
||||
// Read
|
||||
row, err := l.backend.client.Single().ReadRow(ctx, l.backend.haTable, spanner.Key{l.key}, []string{"Key", "Value", "Timestamp", "Identity"})
|
||||
row, err := l.backend.haClient.Single().ReadRow(ctx, l.backend.haTable, spanner.Key{l.key}, []string{"Key", "Value", "Timestamp", "Identity"})
|
||||
if spanner.ErrCode(err) == codes.NotFound {
|
||||
return nil, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue