Don't panic on unknown raft ops (#17732)

* Don't panic on unknown raft ops

* avoid excessive logging

* track at the struct level, not the function level

* add changelog
This commit is contained in:
Josh Black 2022-11-30 15:37:58 -08:00 committed by GitHub
parent 5b731699a1
commit e75633eddc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 7 deletions

3
changelog/17732.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
storage/raft: Don't panic on unknown raft ops
```

View File

@ -104,6 +104,7 @@ type FSM struct {
localID string
desiredSuffrage string
unknownOpTypes sync.Map
}
// NewFSM constructs a FSM using the given directory
@ -593,19 +594,19 @@ func (f *FSM) ApplyBatch(logs []*raft.Log) []interface{} {
// Do the unmarshalling first so we don't hold locks
var latestConfiguration *ConfigurationValue
commands := make([]interface{}, 0, numLogs)
for _, log := range logs {
switch log.Type {
for _, l := range logs {
switch l.Type {
case raft.LogCommand:
command := &LogData{}
err := proto.Unmarshal(log.Data, command)
err := proto.Unmarshal(l.Data, command)
if err != nil {
f.logger.Error("error proto unmarshaling log data", "error", err)
panic("error proto unmarshaling log data")
}
commands = append(commands, command)
case raft.LogConfiguration:
configuration := raft.DecodeConfiguration(log.Data)
config := raftConfigurationToProtoConfiguration(log.Index, configuration)
configuration := raft.DecodeConfiguration(l.Data)
config := raftConfigurationToProtoConfiguration(l.Index, configuration)
commands = append(commands, config)
@ -614,7 +615,7 @@ func (f *FSM) ApplyBatch(logs []*raft.Log) []interface{} {
latestConfiguration = config
default:
panic(fmt.Sprintf("got unexpected log type: %d", log.Type))
panic(fmt.Sprintf("got unexpected log type: %d", l.Type))
}
}
@ -672,7 +673,10 @@ func (f *FSM) ApplyBatch(logs []*raft.Log) []interface{} {
go f.restoreCb(context.Background())
}
default:
return fmt.Errorf("%q is not a supported transaction operation", op.OpType)
if _, ok := f.unknownOpTypes.Load(op.OpType); !ok {
f.logger.Error("unsupported transaction operation", "op", op.OpType)
f.unknownOpTypes.Store(op.OpType, struct{}{})
}
}
if err != nil {
return err