Check to make sure context isn't expired before doing a raft operation. (#12162)
This commit is contained in:
parent
16794711d5
commit
72499c3215
|
@ -0,0 +1,3 @@
|
|||
```release-note:improvement
|
||||
storage/raft: Best-effort handling of cancelled contexts.
|
||||
```
|
|
@ -978,6 +978,10 @@ func (b *RaftBackend) RemovePeer(ctx context.Context, peerID string) error {
|
|||
b.l.RLock()
|
||||
defer b.l.RUnlock()
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if b.disableAutopilot {
|
||||
if b.raft == nil {
|
||||
return errors.New("raft storage is not initialized")
|
||||
|
@ -1034,6 +1038,10 @@ func (b *RaftBackend) GetConfigurationOffline() (*RaftConfigurationResponse, err
|
|||
}
|
||||
|
||||
func (b *RaftBackend) GetConfiguration(ctx context.Context) (*RaftConfigurationResponse, error) {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b.l.RLock()
|
||||
defer b.l.RUnlock()
|
||||
|
||||
|
@ -1068,6 +1076,10 @@ func (b *RaftBackend) GetConfiguration(ctx context.Context) (*RaftConfigurationR
|
|||
|
||||
// AddPeer adds a new server to the raft cluster
|
||||
func (b *RaftBackend) AddPeer(ctx context.Context, peerID, clusterAddr string) error {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.l.RLock()
|
||||
defer b.l.RUnlock()
|
||||
|
||||
|
@ -1096,6 +1108,10 @@ func (b *RaftBackend) AddPeer(ctx context.Context, peerID, clusterAddr string) e
|
|||
|
||||
// Peers returns all the servers present in the raft cluster
|
||||
func (b *RaftBackend) Peers(ctx context.Context) ([]Peer, error) {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b.l.RLock()
|
||||
defer b.l.RUnlock()
|
||||
|
||||
|
@ -1180,6 +1196,10 @@ func (b *RaftBackend) WriteSnapshotToTemp(in io.ReadCloser, access *seal.Access)
|
|||
// RestoreSnapshot applies the provided snapshot metadata and snapshot data to
|
||||
// raft.
|
||||
func (b *RaftBackend) RestoreSnapshot(ctx context.Context, metadata raft.SnapshotMeta, snap io.Reader) error {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.l.RLock()
|
||||
defer b.l.RUnlock()
|
||||
|
||||
|
@ -1214,6 +1234,11 @@ func (b *RaftBackend) RestoreSnapshot(ctx context.Context, metadata raft.Snapsho
|
|||
// Delete inserts an entry in the log to delete the given path
|
||||
func (b *RaftBackend) Delete(ctx context.Context, path string) error {
|
||||
defer metrics.MeasureSince([]string{"raft-storage", "delete"}, time.Now())
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
command := &LogData{
|
||||
Operations: []*LogOperation{
|
||||
{
|
||||
|
@ -1238,9 +1263,17 @@ func (b *RaftBackend) Get(ctx context.Context, path string) (*physical.Entry, er
|
|||
return nil, errors.New("raft: fsm not configured")
|
||||
}
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b.permitPool.Acquire()
|
||||
defer b.permitPool.Release()
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
entry, err := b.fsm.Get(ctx, path)
|
||||
if entry != nil {
|
||||
valueLen := len(entry.Value)
|
||||
|
@ -1258,6 +1291,11 @@ func (b *RaftBackend) Get(ctx context.Context, path string) (*physical.Entry, er
|
|||
// or if the call to applyLog fails.
|
||||
func (b *RaftBackend) Put(ctx context.Context, entry *physical.Entry) error {
|
||||
defer metrics.MeasureSince([]string{"raft-storage", "put"}, time.Now())
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
command := &LogData{
|
||||
Operations: []*LogOperation{
|
||||
{
|
||||
|
@ -1284,9 +1322,17 @@ func (b *RaftBackend) List(ctx context.Context, prefix string) ([]string, error)
|
|||
return nil, errors.New("raft: fsm not configured")
|
||||
}
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b.permitPool.Acquire()
|
||||
defer b.permitPool.Release()
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return b.fsm.List(ctx, prefix)
|
||||
}
|
||||
|
||||
|
@ -1294,6 +1340,11 @@ func (b *RaftBackend) List(ctx context.Context, prefix string) ([]string, error)
|
|||
// applies it.
|
||||
func (b *RaftBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry) error {
|
||||
defer metrics.MeasureSince([]string{"raft-storage", "transaction"}, time.Now())
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
command := &LogData{
|
||||
Operations: make([]*LogOperation, len(txns)),
|
||||
}
|
||||
|
@ -1330,6 +1381,9 @@ func (b *RaftBackend) applyLog(ctx context.Context, command *LogData) error {
|
|||
if b.raft == nil {
|
||||
return errors.New("raft storage is not initialized")
|
||||
}
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
commandBytes, err := proto.Marshal(command)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue