consul: Fix non-deterministic ACL IDs

This commit is contained in:
Armon Dadgar 2014-10-09 12:23:32 -07:00
parent a80478594a
commit 1177a9bf11
5 changed files with 72 additions and 57 deletions

View File

@ -55,6 +55,35 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
return fmt.Errorf("ACL rule compilation failed: %v", err)
}
// Check if this is an update
state := a.srv.fsm.State()
var existing *structs.ACL
if args.ACL.ID != "" {
_, existing, err = state.ACLGet(args.ACL.ID)
if err != nil {
a.srv.logger.Printf("[ERR] consul.acl: ACL lookup failed: %v", err)
return err
}
}
// If this is a create, generate a new ID. This must
// be done prior to appending to the raft log, because the ID is not
// deterministic. Once the entry is in the log, the state update MUST
// be deterministic or the followers will not converge.
if existing == nil {
for {
args.ACL.ID = generateUUID()
_, acl, err := state.ACLGet(args.ACL.ID)
if err != nil {
a.srv.logger.Printf("[ERR] consul.acl: ACL lookup failed: %v", err)
return err
}
if acl == nil {
break
}
}
}
case structs.ACLDelete:
if args.ACL.ID == "" {
return fmt.Errorf("Missing ACL ID")

View File

@ -2,12 +2,13 @@ package consul
import (
"fmt"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/raft"
"github.com/ugorji/go/codec"
"io"
"log"
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/raft"
"github.com/ugorji/go/codec"
)
// consulFSM implements a finite state machine that is used
@ -186,14 +187,10 @@ func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}
switch req.Op {
case structs.ACLSet:
if err := c.state.ACLSet(index, &req.ACL, false); err != nil {
return err
} else {
return req.ACL.ID
}
case structs.ACLForceSet:
if err := c.state.ACLSet(index, &req.ACL, true); err != nil {
fallthrough
case structs.ACLSet:
if err := c.state.ACLSet(index, &req.ACL); err != nil {
return err
} else {
return req.ACL.ID

View File

@ -329,8 +329,8 @@ func TestFSM_SnapshotRestore(t *testing.T) {
})
session := &structs.Session{ID: generateUUID(), Node: "foo"}
fsm.state.SessionCreate(9, session)
acl := &structs.ACL{Name: "User Token"}
fsm.state.ACLSet(10, acl, false)
acl := &structs.ACL{ID: generateUUID(), Name: "User Token"}
fsm.state.ACLSet(10, acl)
// Snapshot
snap, err := fsm.Snapshot()
@ -793,6 +793,7 @@ func TestFSM_ACL_Set_Delete(t *testing.T) {
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
ID: generateUUID(),
Name: "User token",
Type: structs.ACLTypeClient,
},

View File

@ -1550,9 +1550,12 @@ func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn,
}
// ACLSet is used to create or update an ACL entry
// allowCreate is used for initialization of the anonymous and master tokens,
// since it permits them to be created with a specified ID that does not exist.
func (s *StateStore) ACLSet(index uint64, acl *structs.ACL, allowCreate bool) error {
func (s *StateStore) ACLSet(index uint64, acl *structs.ACL) error {
// Check for an ID
if acl.ID == "" {
return fmt.Errorf("Missing ACL ID")
}
// Start a new txn
tx, err := s.tables.StartTxn(false)
if err != nil {
@ -1560,43 +1563,22 @@ func (s *StateStore) ACLSet(index uint64, acl *structs.ACL, allowCreate bool) er
}
defer tx.Abort()
// Generate a new session ID
if acl.ID == "" {
for {
acl.ID = generateUUID()
res, err := s.aclTable.GetTxn(tx, "id", acl.ID)
if err != nil {
return err
}
// Quit if this ID is unique
if len(res) == 0 {
break
}
}
// Look for the existing node
res, err := s.aclTable.GetTxn(tx, "id", acl.ID)
if err != nil {
return err
}
switch len(res) {
case 0:
acl.CreateIndex = index
acl.ModifyIndex = index
} else {
// Look for the existing node
res, err := s.aclTable.GetTxn(tx, "id", acl.ID)
if err != nil {
return err
}
switch len(res) {
case 0:
if !allowCreate {
return fmt.Errorf("Invalid ACL")
}
acl.CreateIndex = index
acl.ModifyIndex = index
case 1:
exist := res[0].(*structs.ACL)
acl.CreateIndex = exist.CreateIndex
acl.ModifyIndex = index
default:
panic(fmt.Errorf("Duplicate ACL definition. Internal error"))
}
case 1:
exist := res[0].(*structs.ACL)
acl.CreateIndex = exist.CreateIndex
acl.ModifyIndex = index
default:
panic(fmt.Errorf("Duplicate ACL definition. Internal error"))
}
// Insert the ACL

View File

@ -705,18 +705,20 @@ func TestStoreSnapshot(t *testing.T) {
}
a1 := &structs.ACL{
ID: generateUUID(),
Name: "User token",
Type: structs.ACLTypeClient,
}
if err := store.ACLSet(19, a1, false); err != nil {
if err := store.ACLSet(19, a1); err != nil {
t.Fatalf("err: %v", err)
}
a2 := &structs.ACL{
ID: generateUUID(),
Name: "User token",
Type: structs.ACLTypeClient,
}
if err := store.ACLSet(20, a2, false); err != nil {
if err := store.ACLSet(20, a2); err != nil {
t.Fatalf("err: %v", err)
}
@ -2237,11 +2239,12 @@ func TestACLSet_Get(t *testing.T) {
}
a := &structs.ACL{
ID: generateUUID(),
Name: "User token",
Type: structs.ACLTypeClient,
Rules: "",
}
if err := store.ACLSet(50, a, false); err != nil {
if err := store.ACLSet(50, a); err != nil {
t.Fatalf("err: %v", err)
}
if a.CreateIndex != 50 {
@ -2267,7 +2270,7 @@ func TestACLSet_Get(t *testing.T) {
// Update
a.Rules = "foo bar baz"
if err := store.ACLSet(52, a, false); err != nil {
if err := store.ACLSet(52, a); err != nil {
t.Fatalf("err: %v", err)
}
if a.CreateIndex != 50 {
@ -2297,11 +2300,12 @@ func TestACLDelete(t *testing.T) {
defer store.Close()
a := &structs.ACL{
ID: generateUUID(),
Name: "User token",
Type: structs.ACLTypeClient,
Rules: "",
}
if err := store.ACLSet(50, a, false); err != nil {
if err := store.ACLSet(50, a); err != nil {
t.Fatalf("err: %v", err)
}
@ -2332,18 +2336,20 @@ func TestACLList(t *testing.T) {
defer store.Close()
a1 := &structs.ACL{
ID: generateUUID(),
Name: "User token",
Type: structs.ACLTypeClient,
}
if err := store.ACLSet(50, a1, false); err != nil {
if err := store.ACLSet(50, a1); err != nil {
t.Fatalf("err: %v", err)
}
a2 := &structs.ACL{
ID: generateUUID(),
Name: "User token",
Type: structs.ACLTypeClient,
}
if err := store.ACLSet(51, a2, false); err != nil {
if err := store.ACLSet(51, a2); err != nil {
t.Fatalf("err: %v", err)
}