Prevent raft transactions from containing overlarge keys. (#13286)

This commit is contained in:
Nick Cabatoff 2021-11-26 08:38:39 -05:00 committed by GitHub
parent f85908e1df
commit 997a5ace91
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 45 additions and 0 deletions

3
changelog/13286.txt Normal file
View file

@ -0,0 +1,3 @@
```release-note:bug
storage/raft: Fix a panic when trying to store a key > 32KB in a transaction.
```

View file

@ -1352,6 +1352,9 @@ func (b *RaftBackend) Transaction(ctx context.Context, txns []*physical.TxnEntry
op := &LogOperation{} op := &LogOperation{}
switch txn.Operation { switch txn.Operation {
case physical.PutOperation: case physical.PutOperation:
if len(txn.Entry.Key) > bolt.MaxKeySize {
return fmt.Errorf("%s, max key size for integrated storage is %d", physical.ErrKeyTooLarge, bolt.MaxKeySize)
}
op.OpType = putOp op.OpType = putOp
op.Key = txn.Entry.Key op.Key = txn.Entry.Key
op.Value = txn.Entry.Value op.Value = txn.Entry.Value

View file

@ -280,6 +280,45 @@ func TestRaft_Backend_LargeValue(t *testing.T) {
} }
} }
func TestRaft_TransactionalBackend_LargeKey(t *testing.T) {
b, dir := getRaft(t, true, true)
defer os.RemoveAll(dir)
value := make([]byte, defaultMaxEntrySize+1)
rand.Read(value)
key, err := base62.Random(bolt.MaxKeySize + 1)
if err != nil {
t.Fatal(err)
}
txns := []*physical.TxnEntry{
{
Operation: physical.PutOperation,
Entry: &physical.Entry{
Key: key,
Value: []byte(key),
},
},
}
err = b.Transaction(context.Background(), txns)
if err == nil {
t.Fatal("expected error for transactions")
}
if !strings.Contains(err.Error(), physical.ErrKeyTooLarge) {
t.Fatalf("expected %q, got %v", physical.ErrValueTooLarge, err)
}
out, err := b.Get(context.Background(), txns[0].Entry.Key)
if err != nil {
t.Fatalf("unexpected error after failed put: %v", err)
}
if out != nil {
t.Fatal("expected response entry to be nil after a failed put")
}
}
func TestRaft_TransactionalBackend_LargeValue(t *testing.T) { func TestRaft_TransactionalBackend_LargeValue(t *testing.T) {
b, dir := getRaft(t, true, true) b, dir := getRaft(t, true, true)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)