consul: Fix non-deterministic session IDs

This commit is contained in:
Armon Dadgar 2014-10-09 11:54:47 -07:00
parent 8f85c977bf
commit a80478594a
6 changed files with 69 additions and 41 deletions

View File

@ -2,14 +2,15 @@ package consul
import (
"fmt"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"net/rpc"
"os"
"sort"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
)
func TestCatalogRegister(t *testing.T) {
@ -670,7 +671,7 @@ func TestCatalogNodeServices(t *testing.T) {
if !strContains(services["db"].Tags, "primary") || services["db"].Port != 5000 {
t.Fatalf("bad: %v", out)
}
if services["web"].Tags != nil || services["web"].Port != 80 {
if len(services["web"].Tags) != 0 || services["web"].Port != 80 {
t.Fatalf("bad: %v", out)
}
}

View File

@ -2,10 +2,11 @@ package consul
import (
"bytes"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/raft"
"os"
"testing"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/raft"
)
type MockSink struct {
@ -326,7 +327,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
Key: "/test",
Value: []byte("foo"),
})
session := &structs.Session{Node: "foo"}
session := &structs.Session{ID: generateUUID(), Node: "foo"}
fsm.state.SessionCreate(9, session)
acl := &structs.ACL{Name: "User Token"}
fsm.state.ACLSet(10, acl, false)
@ -611,6 +612,7 @@ func TestFSM_SessionCreate_Destroy(t *testing.T) {
Datacenter: "dc1",
Op: structs.SessionCreate,
Session: structs.Session{
ID: generateUUID(),
Node: "foo",
Checks: []string{"web"},
},
@ -679,7 +681,7 @@ func TestFSM_KVSLock(t *testing.T) {
defer fsm.Close()
fsm.state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
session := &structs.Session{Node: "foo"}
session := &structs.Session{ID: generateUUID(), Node: "foo"}
fsm.state.SessionCreate(2, session)
req := structs.KVSRequest{
@ -724,7 +726,7 @@ func TestFSM_KVSUnlock(t *testing.T) {
defer fsm.Close()
fsm.state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
session := &structs.Session{Node: "foo"}
session := &structs.Session{ID: generateUUID(), Node: "foo"}
fsm.state.SessionCreate(2, session)
req := structs.KVSRequest{

View File

@ -513,6 +513,7 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
t.Fatalf("err: %v")
}
session := &structs.Session{
ID: generateUUID(),
Node: "foo",
LockDelay: 50 * time.Millisecond,
}

View File

@ -2,9 +2,10 @@ package consul
import (
"fmt"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs"
"time"
)
// Session endpoint is used to manipulate sessions for KV
@ -28,6 +29,26 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
return fmt.Errorf("Must provide Node")
}
// If this is a create, we must generate the Session ID. This must
// be done prior to appending to the raft log, because the ID is not
// deterministic. Once the entry is in the log, the state update MUST
// be deterministic or the followers will not converge.
if args.Op == structs.SessionCreate {
// Generate a new session ID, verify uniqueness
state := s.srv.fsm.State()
for {
args.Session.ID = generateUUID()
_, sess, err := state.SessionGet(args.Session.ID)
if err != nil {
s.srv.logger.Printf("[ERR] consul.session: Session lookup failed: %v", err)
return err
}
if sess == nil {
break
}
}
}
// Apply the update
resp, err := s.srv.raftApply(structs.SessionRequestType, args)
if err != nil {

View File

@ -2,8 +2,6 @@ package consul
import (
"fmt"
"github.com/armon/gomdb"
"github.com/hashicorp/consul/consul/structs"
"io"
"io/ioutil"
"log"
@ -12,6 +10,9 @@ import (
"strings"
"sync"
"time"
"github.com/armon/gomdb"
"github.com/hashicorp/consul/consul/structs"
)
const (
@ -1288,6 +1289,11 @@ func (s *StateStore) kvsSet(
// SessionCreate is used to create a new session. The
// ID will be populated on a successful return
func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error {
// Verify a Session ID is generated
if session.ID == "" {
return fmt.Errorf("Missing Session ID")
}
// Assign the create index
session.CreateIndex = index
@ -1322,19 +1328,6 @@ func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error
}
}
// Generate a new session ID, verify uniqueness
for {
session.ID = generateUUID()
res, err = s.sessionTable.GetTxn(tx, "id", session.ID)
if err != nil {
return err
}
// Quit if this ID is unique
if len(res) == 0 {
break
}
}
// Insert the session
if err := s.sessionTable.InsertTxn(tx, session); err != nil {
return err

View File

@ -1,12 +1,13 @@
package consul
import (
"github.com/hashicorp/consul/consul/structs"
"os"
"reflect"
"sort"
"testing"
"time"
"github.com/hashicorp/consul/consul/structs"
)
func testStateStore() (*StateStore, error) {
@ -51,7 +52,7 @@ func TestEnsureRegistration(t *testing.T) {
if !ok {
t.Fatalf("missing api: %#v", services)
}
if entry.Tags != nil || entry.Port != 5000 {
if len(entry.Tags) != 0 || entry.Port != 5000 {
t.Fatalf("Bad entry: %#v", entry)
}
@ -169,7 +170,7 @@ func TestEnsureService(t *testing.T) {
if !ok {
t.Fatalf("missing api: %#v", services)
}
if entry.Tags != nil || entry.Port != 5001 {
if len(entry.Tags) != 0 || entry.Port != 5001 {
t.Fatalf("Bad entry: %#v", entry)
}
@ -214,7 +215,7 @@ func TestEnsureService_DuplicateNode(t *testing.T) {
if !ok {
t.Fatalf("missing api: %#v", services)
}
if entry.Tags != nil || entry.Port != 5000 {
if len(entry.Tags) != 0 || entry.Port != 5000 {
t.Fatalf("Bad entry: %#v", entry)
}
@ -222,7 +223,7 @@ func TestEnsureService_DuplicateNode(t *testing.T) {
if !ok {
t.Fatalf("missing api: %#v", services)
}
if entry.Tags != nil || entry.Port != 5001 {
if len(entry.Tags) != 0 || entry.Port != 5001 {
t.Fatalf("Bad entry: %#v", entry)
}
@ -230,7 +231,7 @@ func TestEnsureService_DuplicateNode(t *testing.T) {
if !ok {
t.Fatalf("missing api: %#v", services)
}
if entry.Tags != nil || entry.Port != 5002 {
if len(entry.Tags) != 0 || entry.Port != 5002 {
t.Fatalf("Bad entry: %#v", entry)
}
}
@ -689,12 +690,12 @@ func TestStoreSnapshot(t *testing.T) {
}
// Add some sessions
session := &structs.Session{Node: "foo"}
session := &structs.Session{ID: generateUUID(), Node: "foo"}
if err := store.SessionCreate(16, session); err != nil {
t.Fatalf("err: %v", err)
}
session = &structs.Session{Node: "bar"}
session = &structs.Session{ID: generateUUID(), Node: "bar"}
if err := store.SessionCreate(17, session); err != nil {
t.Fatalf("err: %v", err)
}
@ -1720,6 +1721,7 @@ func TestSessionCreate(t *testing.T) {
}
session := &structs.Session{
ID: generateUUID(),
Node: "foo",
Checks: []string{"bar"},
}
@ -1728,10 +1730,6 @@ func TestSessionCreate(t *testing.T) {
t.Fatalf("err: %v", err)
}
if session.ID == "" {
t.Fatalf("bad: %v", session)
}
if session.CreateIndex != 1000 {
t.Fatalf("bad: %v", session)
}
@ -1746,6 +1744,7 @@ func TestSessionCreate_Invalid(t *testing.T) {
// No node registered
session := &structs.Session{
ID: generateUUID(),
Node: "foo",
Checks: []string{"bar"},
}
@ -1787,7 +1786,9 @@ func TestSession_Lookups(t *testing.T) {
t.Fatalf("err: %v")
}
session := &structs.Session{
Node: "foo",
ID: generateUUID(),
Node: "foo",
Checks: []string{},
}
if err := store.SessionCreate(1000, session); err != nil {
t.Fatalf("err: %v", err)
@ -1802,13 +1803,14 @@ func TestSession_Lookups(t *testing.T) {
t.Fatalf("bad: %v", idx)
}
if !reflect.DeepEqual(s2, session) {
t.Fatalf("bad: %v", s2)
t.Fatalf("bad: %#v %#v", s2, session)
}
// Create many sessions
ids := []string{session.ID}
for i := 0; i < 10; i++ {
session := &structs.Session{
ID: generateUUID(),
Node: "foo",
}
if err := store.SessionCreate(uint64(1000+i), session); err != nil {
@ -1878,6 +1880,7 @@ func TestSessionInvalidate_CriticalHealthCheck(t *testing.T) {
}
session := &structs.Session{
ID: generateUUID(),
Node: "foo",
Checks: []string{"bar"},
}
@ -1921,6 +1924,7 @@ func TestSessionInvalidate_DeleteHealthCheck(t *testing.T) {
}
session := &structs.Session{
ID: generateUUID(),
Node: "foo",
Checks: []string{"bar"},
}
@ -1955,6 +1959,7 @@ func TestSessionInvalidate_DeleteNode(t *testing.T) {
}
session := &structs.Session{
ID: generateUUID(),
Node: "foo",
}
if err := store.SessionCreate(14, session); err != nil {
@ -2001,6 +2006,7 @@ func TestSessionInvalidate_DeleteNodeService(t *testing.T) {
}
session := &structs.Session{
ID: generateUUID(),
Node: "foo",
Checks: []string{"api"},
}
@ -2033,7 +2039,7 @@ func TestKVSLock(t *testing.T) {
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{Node: "foo"}
session := &structs.Session{ID: generateUUID(), Node: "foo"}
if err := store.SessionCreate(4, session); err != nil {
t.Fatalf("err: %v", err)
}
@ -2106,7 +2112,7 @@ func TestKVSUnlock(t *testing.T) {
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{Node: "foo"}
session := &structs.Session{ID: generateUUID(), Node: "foo"}
if err := store.SessionCreate(4, session); err != nil {
t.Fatalf("err: %v", err)
}
@ -2164,7 +2170,11 @@ func TestSessionInvalidate_KeyUnlock(t *testing.T) {
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{Node: "foo", LockDelay: 50 * time.Millisecond}
session := &structs.Session{
ID: generateUUID(),
Node: "foo",
LockDelay: 50 * time.Millisecond,
}
if err := store.SessionCreate(4, session); err != nil {
t.Fatalf("err: %v", err)
}