Merge pull request #162 from hashicorp/f-locking

Adding support for sessions and locking in the KV store
This commit is contained in:
Armon Dadgar 2014-05-20 16:41:20 -07:00
commit f2a65e4568
42 changed files with 2753 additions and 150 deletions

View File

@ -9,7 +9,7 @@
"api_key": "{{ user `do_api_key` }}",
"client_id": "{{ user `do_client_id` }}",
"region_id": "1",
"size_id": "61",
"size_id": "66",
"image_id": "3101045",
"snapshot_name": "bench-bootstrap-{{ isotime }}",
"name": "bootstrap"
@ -19,7 +19,7 @@
"api_key": "{{ user `do_api_key` }}",
"client_id": "{{ user `do_client_id` }}",
"region_id": "1",
"size_id": "61",
"size_id": "66",
"image_id": "3101045",
"snapshot_name": "bench-server-{{ isotime }}",
"name": "server"
@ -29,7 +29,7 @@
"api_key": "{{ user `do_api_key` }}",
"client_id": "{{ user `do_client_id` }}",
"region_id": "1",
"size_id": "61",
"size_id": "66",
"image_id": "3101045",
"snapshot_name": "bench-worker-{{ isotime }}",
"name": "worker"
@ -73,8 +73,8 @@
{
"type": "shell",
"inline": [
"curl https://s3.amazonaws.com/hc-ops/boom_linux_amd64 -o /usr/bin/boom",
"chmod +x /usr/bin/boom"
"curl https://s3.amazonaws.com/hc-ops/boom_linux_amd64 -o /usr/local/bin/boom",
"chmod +x /usr/local/bin/boom"
]
},
{

View File

@ -1,4 +1,5 @@
{
"data_dir": "/var/lib/consul",
"enable_debug": true,
"log_level": "info"
}

View File

@ -1,15 +1,15 @@
package agent
import (
"errors"
"fmt"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/serf/serf"
"net/http"
"os"
"testing"
"time"
"errors"
)
func TestHTTPAgentServices(t *testing.T) {

View File

@ -2,8 +2,8 @@ package agent
import (
"fmt"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"net/http"
"net/http/httptest"
"os"
@ -174,7 +174,7 @@ func TestCatalogNodes_Blocking(t *testing.T) {
}
// Should block for a while
if time.Now().Sub(start) < 50 * time.Millisecond {
if time.Now().Sub(start) < 50*time.Millisecond {
// TODO: Failing
t.Fatalf("too fast")
}

View File

@ -2,8 +2,8 @@ package agent
import (
"fmt"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/miekg/dns"
"os"
"strings"

View File

@ -2,8 +2,8 @@ package agent
import (
"fmt"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"net/http"
"net/http/httptest"
"os"

View File

@ -93,6 +93,12 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/kv/", s.wrap(s.KVSEndpoint))
s.mux.HandleFunc("/v1/session/create", s.wrap(s.SessionCreate))
s.mux.HandleFunc("/v1/session/destroy/", s.wrap(s.SessionDestroy))
s.mux.HandleFunc("/v1/session/info/", s.wrap(s.SessionGet))
s.mux.HandleFunc("/v1/session/node/", s.wrap(s.SessionsForNode))
s.mux.HandleFunc("/v1/session/list", s.wrap(s.SessionList))
if enableDebug {
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)

View File

@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"io"
"io/ioutil"
"net/http"
@ -255,3 +256,12 @@ func getIndex(t *testing.T, resp *httptest.ResponseRecorder) uint64 {
}
return uint64(val)
}
func httpTest(t *testing.T, f func(srv *HTTPServer)) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
f(srv)
}

View File

@ -156,6 +156,18 @@ func (s *HTTPServer) KVSPut(resp http.ResponseWriter, req *http.Request, args *s
applyReq.Op = structs.KVSCAS
}
// Check for lock acquisition
if _, ok := params["acquire"]; ok {
applyReq.DirEnt.Session = params.Get("acquire")
applyReq.Op = structs.KVSLock
}
// Check for lock release
if _, ok := params["release"]; ok {
applyReq.DirEnt.Session = params.Get("release")
applyReq.Op = structs.KVSUnlock
}
// Check the content-length
if req.ContentLength > maxKVSize {
resp.WriteHeader(413)

View File

@ -3,8 +3,8 @@ package agent
import (
"bytes"
"fmt"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"net/http"
"net/http/httptest"
"os"
@ -339,3 +339,73 @@ func TestKVSEndpoint_ListKeys(t *testing.T) {
}
}
}
func TestKVSEndpoint_AcquireRelease(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
// Acquire the lock
id := makeTestSession(t, srv)
req, err := http.NewRequest("PUT",
"/v1/kv/test?acquire="+id, bytes.NewReader(nil))
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if res := obj.(bool); !res {
t.Fatalf("should work")
}
// Verify we have the lock
req, err = http.NewRequest("GET", "/v1/kv/test", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = httptest.NewRecorder()
obj, err = srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
d := obj.(structs.DirEntries)[0]
// Check the flags
if d.Session != id {
t.Fatalf("bad: %v", d)
}
// Release the lock
req, err = http.NewRequest("PUT",
"/v1/kv/test?release="+id, bytes.NewReader(nil))
if err != nil {
t.Fatalf("err: %v", err)
}
resp = httptest.NewRecorder()
obj, err = srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if res := obj.(bool); !res {
t.Fatalf("should work")
}
// Verify we do not have the lock
req, err = http.NewRequest("GET", "/v1/kv/test", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = httptest.NewRecorder()
obj, err = srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
d = obj.(structs.DirEntries)[0]
// Check the flags
if d.Session != "" {
t.Fatalf("bad: %v", d)
}
})
}

View File

@ -1,8 +1,8 @@
package agent
import (
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"os"
"reflect"
"testing"

View File

@ -1,15 +1,15 @@
package agent
import (
"errors"
"fmt"
"github.com/hashicorp/serf/serf"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/serf/serf"
"io"
"net"
"os"
"strings"
"testing"
"errors"
"time"
)

View File

@ -0,0 +1,185 @@
package agent
import (
"fmt"
"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/structs"
"net/http"
"strings"
"time"
)
const (
// lockDelayMinThreshold is used to convert a numeric lock
// delay value from nanoseconds to seconds if it is below this
// threshold. Users often send a value like 5, which they assume
// is seconds, but because Go uses nanosecond granularity, ends
// up being very small. If we see a value below this threshold,
// we multply by time.Second
lockDelayMinThreshold = 1000
)
// sessionCreateResponse is used to wrap the session ID
type sessionCreateResponse struct {
ID string
}
// SessionCreate is used to create a new session
func (s *HTTPServer) SessionCreate(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Mandate a PUT request
if req.Method != "PUT" {
resp.WriteHeader(405)
return nil, nil
}
// Default the session to our node + serf check
args := structs.SessionRequest{
Op: structs.SessionCreate,
Session: structs.Session{
Node: s.agent.config.NodeName,
Checks: []string{consul.SerfCheckID},
LockDelay: 15 * time.Second,
},
}
s.parseDC(req, &args.Datacenter)
// Handle optional request body
if req.ContentLength > 0 {
if err := decodeBody(req, &args.Session, FixupLockDelay); err != nil {
resp.WriteHeader(400)
resp.Write([]byte(fmt.Sprintf("Request decode failed: %v", err)))
return nil, nil
}
}
// Create the session, get the ID
var out string
if err := s.agent.RPC("Session.Apply", &args, &out); err != nil {
return nil, err
}
// Format the response as a JSON object
return sessionCreateResponse{out}, nil
}
// FixupLockDelay is used to handle parsing the JSON body to session/create
// and properly parsing out the lock delay duration value.
func FixupLockDelay(raw interface{}) error {
rawMap, ok := raw.(map[string]interface{})
if !ok {
return nil
}
var key string
for k, _ := range rawMap {
if strings.ToLower(k) == "lockdelay" {
key = k
break
}
}
if key != "" {
val := rawMap[key]
// Convert a string value into an integer
if vStr, ok := val.(string); ok {
dur, err := time.ParseDuration(vStr)
if err != nil {
return err
}
if dur < lockDelayMinThreshold {
dur = dur * time.Second
}
rawMap[key] = dur
}
// Convert low value integers into seconds
if vNum, ok := val.(float64); ok {
dur := time.Duration(vNum)
if dur < lockDelayMinThreshold {
dur = dur * time.Second
}
rawMap[key] = dur
}
}
return nil
}
// SessionDestroy is used to destroy an existing session
func (s *HTTPServer) SessionDestroy(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.SessionRequest{
Op: structs.SessionDestroy,
}
s.parseDC(req, &args.Datacenter)
// Pull out the session id
args.Session.ID = strings.TrimPrefix(req.URL.Path, "/v1/session/destroy/")
if args.Session.ID == "" {
resp.WriteHeader(400)
resp.Write([]byte("Missing session"))
return nil, nil
}
var out string
if err := s.agent.RPC("Session.Apply", &args, &out); err != nil {
return nil, err
}
return true, nil
}
// SessionGet is used to get info for a particular session
func (s *HTTPServer) SessionGet(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.SessionSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
// Pull out the session id
args.Session = strings.TrimPrefix(req.URL.Path, "/v1/session/info/")
if args.Session == "" {
resp.WriteHeader(400)
resp.Write([]byte("Missing session"))
return nil, nil
}
var out structs.IndexedSessions
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("Session.Get", &args, &out); err != nil {
return nil, err
}
return out.Sessions, nil
}
// SessionList is used to list all the sessions
func (s *HTTPServer) SessionList(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.DCSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
var out structs.IndexedSessions
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("Session.List", &args, &out); err != nil {
return nil, err
}
return out.Sessions, nil
}
// SessionsForNode returns all the nodes belonging to a node
func (s *HTTPServer) SessionsForNode(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.NodeSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
// Pull out the node name
args.Node = strings.TrimPrefix(req.URL.Path, "/v1/session/node/")
if args.Node == "" {
resp.WriteHeader(400)
resp.Write([]byte("Missing node name"))
return nil, nil
}
var out structs.IndexedSessions
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("Session.NodeSessions", &args, &out); err != nil {
return nil, err
}
return out.Sessions, nil
}

View File

@ -0,0 +1,188 @@
package agent
import (
"bytes"
"encoding/json"
"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/structs"
"net/http"
"net/http/httptest"
"testing"
"time"
)
func TestSessionCreate(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
// Create a health check
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: srv.agent.config.NodeName,
Address: "127.0.0.1",
Check: &structs.HealthCheck{
CheckID: "consul",
Node: srv.agent.config.NodeName,
Name: "consul",
ServiceID: "consul",
},
}
var out struct{}
if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Associate session with node and 2 health checks
body := bytes.NewBuffer(nil)
enc := json.NewEncoder(body)
raw := map[string]interface{}{
"Node": srv.agent.config.NodeName,
"Checks": []string{consul.SerfCheckID, "consul"},
"LockDelay": "20s",
}
enc.Encode(raw)
req, err := http.NewRequest("PUT", "/v1/session/create", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.SessionCreate(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := obj.(sessionCreateResponse); !ok {
t.Fatalf("should work")
}
})
}
func TestFixupLockDelay(t *testing.T) {
inp := map[string]interface{}{
"lockdelay": float64(15),
}
if err := FixupLockDelay(inp); err != nil {
t.Fatalf("err: %v", err)
}
if inp["lockdelay"] != 15*time.Second {
t.Fatalf("bad: %v", inp)
}
inp = map[string]interface{}{
"lockDelay": float64(15 * time.Second),
}
if err := FixupLockDelay(inp); err != nil {
t.Fatalf("err: %v", err)
}
if inp["lockDelay"] != 15*time.Second {
t.Fatalf("bad: %v", inp)
}
inp = map[string]interface{}{
"LockDelay": "15s",
}
if err := FixupLockDelay(inp); err != nil {
t.Fatalf("err: %v", err)
}
if inp["LockDelay"] != 15*time.Second {
t.Fatalf("bad: %v", inp)
}
}
func makeTestSession(t *testing.T, srv *HTTPServer) string {
req, err := http.NewRequest("PUT", "/v1/session/create", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.SessionCreate(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
sessResp := obj.(sessionCreateResponse)
return sessResp.ID
}
func TestSessionDestroy(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
id := makeTestSession(t, srv)
req, err := http.NewRequest("PUT", "/v1/session/destroy/"+id, nil)
resp := httptest.NewRecorder()
obj, err := srv.SessionDestroy(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp := obj.(bool); !resp {
t.Fatalf("should work")
}
})
}
func TestSessionGet(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
id := makeTestSession(t, srv)
req, err := http.NewRequest("GET",
"/v1/session/info/"+id, nil)
resp := httptest.NewRecorder()
obj, err := srv.SessionGet(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
respObj, ok := obj.(structs.Sessions)
if !ok {
t.Fatalf("should work")
}
if len(respObj) != 1 {
t.Fatalf("bad: %v", respObj)
}
})
}
func TestSessionList(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
var ids []string
for i := 0; i < 10; i++ {
ids = append(ids, makeTestSession(t, srv))
}
req, err := http.NewRequest("GET", "/v1/session/list", nil)
resp := httptest.NewRecorder()
obj, err := srv.SessionList(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
respObj, ok := obj.(structs.Sessions)
if !ok {
t.Fatalf("should work")
}
if len(respObj) != 10 {
t.Fatalf("bad: %v", respObj)
}
})
}
func TestSessionsForNode(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
var ids []string
for i := 0; i < 10; i++ {
ids = append(ids, makeTestSession(t, srv))
}
req, err := http.NewRequest("GET",
"/v1/session/node/"+srv.agent.config.NodeName, nil)
resp := httptest.NewRecorder()
obj, err := srv.SessionsForNode(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
respObj, ok := obj.(structs.Sessions)
if !ok {
t.Fatalf("should work")
}
if len(respObj) != 10 {
t.Fatalf("bad: %v", respObj)
}
})
}

View File

@ -1,9 +1,9 @@
package agent
import (
"github.com/hashicorp/consul/testutil"
"os"
"testing"
"github.com/hashicorp/consul/testutil"
)
func TestStatusLeader(t *testing.T) {

View File

@ -3,8 +3,8 @@ package agent
import (
"bytes"
"fmt"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"io"
"io/ioutil"
"net/http"

View File

@ -1,13 +1,13 @@
package command
import (
"errors"
"fmt"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/serf/serf"
"github.com/mitchellh/cli"
"strings"
"testing"
"errors"
)
func TestForceLeaveCommand_implements(t *testing.T) {

View File

@ -498,7 +498,7 @@ func TestCatalogListServices_Blocking(t *testing.T) {
}
// Should block at least 100ms
if time.Now().Sub(start) < 100 * time.Millisecond {
if time.Now().Sub(start) < 100*time.Millisecond {
t.Fatalf("too fast")
}
@ -544,7 +544,7 @@ func TestCatalogListServices_Timeout(t *testing.T) {
}
// Should block at least 100ms
if time.Now().Sub(start) < 100 * time.Millisecond {
if time.Now().Sub(start) < 100*time.Millisecond {
// TODO: Failing
t.Fatalf("too fast")
}

View File

@ -2,8 +2,8 @@ package consul
import (
"fmt"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"net"
"os"
"testing"

View File

@ -67,6 +67,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
return c.applyDeregister(buf[1:], log.Index)
case structs.KVSRequestType:
return c.applyKVSOperation(buf[1:], log.Index)
case structs.SessionRequestType:
return c.applySessionOperation(buf[1:], log.Index)
default:
panic(fmt.Errorf("failed to apply request: %#v", buf))
}
@ -152,6 +154,20 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
} else {
return act
}
case structs.KVSLock:
act, err := c.state.KVSLock(index, &req.DirEnt)
if err != nil {
return err
} else {
return act
}
case structs.KVSUnlock:
act, err := c.state.KVSUnlock(index, &req.DirEnt)
if err != nil {
return err
} else {
return act
}
default:
c.logger.Printf("[WARN] consul.fsm: Invalid KVS operation '%s'", req.Op)
return fmt.Errorf("Invalid KVS operation '%s'", req.Op)
@ -159,6 +175,27 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
return nil
}
func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{} {
var req structs.SessionRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
switch req.Op {
case structs.SessionCreate:
if err := c.state.SessionCreate(index, &req.Session); err != nil {
return err
} else {
return req.Session.ID
}
case structs.SessionDestroy:
return c.state.SessionDestroy(index, req.Session.ID)
default:
c.logger.Printf("[WARN] consul.fsm: Invalid Session operation '%s'", req.Op)
return fmt.Errorf("Invalid Session operation '%s'", req.Op)
}
return nil
}
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
defer func(start time.Time) {
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
@ -222,6 +259,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
return err
}
case structs.SessionRequestType:
var req structs.Session
if err := dec.Decode(&req); err != nil {
return err
}
if err := c.state.SessionRestore(&req); err != nil {
return err
}
default:
return fmt.Errorf("Unrecognized msg type: %v", msgType)
}
@ -244,6 +290,25 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
return err
}
if err := s.persistNodes(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistSessions(sink, encoder); err != nil {
sink.Cancel()
return err
}
if err := s.persistKV(sink, encoder); err != nil {
sink.Cancel()
return err
}
return nil
}
func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the nodes
nodes := s.state.Nodes()
@ -258,7 +323,6 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
// Register the node itself
sink.Write([]byte{byte(structs.RegisterRequestType)})
if err := encoder.Encode(&req); err != nil {
sink.Cancel()
return err
}
@ -268,7 +332,6 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
req.Service = srv
sink.Write([]byte{byte(structs.RegisterRequestType)})
if err := encoder.Encode(&req); err != nil {
sink.Cancel()
return err
}
}
@ -280,16 +343,31 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
req.Check = check
sink.Write([]byte{byte(structs.RegisterRequestType)})
if err := encoder.Encode(&req); err != nil {
sink.Cancel()
return err
}
}
}
return nil
}
// Enable GC of the ndoes
nodes = nil
func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
sessions, err := s.state.SessionList()
if err != nil {
return err
}
// Dump the KVS entries
for _, s := range sessions {
sink.Write([]byte{byte(structs.SessionRequestType)})
if err := encoder.Encode(s); err != nil {
return err
}
}
return nil
}
func (s *consulSnapshot) persistKV(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
streamCh := make(chan interface{}, 256)
errorCh := make(chan error)
go func() {
@ -298,25 +376,21 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
}
}()
OUTER:
for {
select {
case raw := <-streamCh:
if raw == nil {
break OUTER
return nil
}
sink.Write([]byte{byte(structs.KVSRequestType)})
if err := encoder.Encode(raw); err != nil {
sink.Cancel()
return err
}
case err := <-errorCh:
sink.Cancel()
return err
}
}
return nil
}

View File

@ -326,6 +326,8 @@ func TestFSM_SnapshotRestore(t *testing.T) {
Key: "/test",
Value: []byte("foo"),
})
session := &structs.Session{Node: "foo"}
fsm.state.SessionCreate(9, session)
// Snapshot
snap, err := fsm.Snapshot()
@ -383,6 +385,15 @@ func TestFSM_SnapshotRestore(t *testing.T) {
if string(d.Value) != "foo" {
t.Fatalf("bad: %v", d)
}
// Verify session is restored
_, s, err := fsm.state.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s.Node != "foo" {
t.Fatalf("bad: %v", d)
}
}
func TestFSM_KVSSet(t *testing.T) {
@ -569,3 +580,190 @@ func TestFSM_KVSCheckAndSet(t *testing.T) {
t.Fatalf("bad: %v", d)
}
}
func TestFSM_SessionCreate_Destroy(t *testing.T) {
fsm, err := NewFSM(os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
fsm.state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
fsm.state.EnsureCheck(2, &structs.HealthCheck{
Node: "foo",
CheckID: "web",
Status: structs.HealthPassing,
})
// Create a new session
req := structs.SessionRequest{
Datacenter: "dc1",
Op: structs.SessionCreate,
Session: structs.Session{
Node: "foo",
Checks: []string{"web"},
},
}
buf, err := structs.Encode(structs.SessionRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if err, ok := resp.(error); ok {
t.Fatalf("resp: %v", err)
}
// Get the session
id := resp.(string)
_, session, err := fsm.state.SessionGet(id)
if err != nil {
t.Fatalf("err: %v", err)
}
if session == nil {
t.Fatalf("missing")
}
// Verify the session
if session.ID != id {
t.Fatalf("bad: %v", *session)
}
if session.Node != "foo" {
t.Fatalf("bad: %v", *session)
}
if session.Checks[0] != "web" {
t.Fatalf("bad: %v", *session)
}
// Try to destroy
destroy := structs.SessionRequest{
Datacenter: "dc1",
Op: structs.SessionDestroy,
Session: structs.Session{
ID: id,
},
}
buf, err = structs.Encode(structs.SessionRequestType, destroy)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
_, session, err = fsm.state.SessionGet(id)
if err != nil {
t.Fatalf("err: %v", err)
}
if session != nil {
t.Fatalf("should be destroyed")
}
}
func TestFSM_KVSLock(t *testing.T) {
fsm, err := NewFSM(os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
fsm.state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
session := &structs.Session{Node: "foo"}
fsm.state.SessionCreate(2, session)
req := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "/test/path",
Value: []byte("test"),
Session: session.ID,
},
}
buf, err := structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != true {
t.Fatalf("resp: %v", resp)
}
// Verify key is locked
_, d, err := fsm.state.KVSGet("/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("missing")
}
if d.LockIndex != 1 {
t.Fatalf("bad: %v", *d)
}
if d.Session != session.ID {
t.Fatalf("bad: %v", *d)
}
}
func TestFSM_KVSUnlock(t *testing.T) {
fsm, err := NewFSM(os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
fsm.state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
session := &structs.Session{Node: "foo"}
fsm.state.SessionCreate(2, session)
req := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "/test/path",
Value: []byte("test"),
Session: session.ID,
},
}
buf, err := structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != true {
t.Fatalf("resp: %v", resp)
}
req = structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSUnlock,
DirEnt: structs.DirEntry{
Key: "/test/path",
Value: []byte("test"),
Session: session.ID,
},
}
buf, err = structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != true {
t.Fatalf("resp: %v", resp)
}
// Verify key is unlocked
_, d, err := fsm.state.KVSGet("/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("missing")
}
if d.LockIndex != 1 {
t.Fatalf("bad: %v", *d)
}
if d.Session != "" {
t.Fatalf("bad: %v", *d)
}
}

View File

@ -1,8 +1,8 @@
package consul
import (
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"os"
"testing"
)

View File

@ -1,8 +1,8 @@
package consul
import (
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"os"
"testing"
)

View File

@ -25,6 +25,23 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
return fmt.Errorf("Must provide key")
}
// If this is a lock, we must check for a lock-delay. Since lock-delay
// is based on wall-time, each peer expire the lock-delay at a slightly
// different time. This means the enforcement of lock-delay cannot be done
// after the raft log is committed as it would lead to inconsistent FSMs.
// Instead, the lock-delay must be enforced before commit. This means that
// only the wall-time of the leader node is used, preventing any inconsistencies.
if args.Op == structs.KVSLock {
state := k.srv.fsm.State()
expires := state.KVSLockDelay(args.DirEnt.Key)
if expires.After(time.Now()) {
k.srv.logger.Printf("[WARN] consul.kvs: Rejecting lock of %s due to lock-delay until %v",
args.DirEnt.Key, expires)
*reply = false
return nil
}
}
// Apply the update
resp, err := k.srv.raftApply(structs.KVSRequestType, args)
if err != nil {

View File

@ -1,10 +1,11 @@
package consul
import (
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"os"
"testing"
"time"
)
func TestKVS_Apply(t *testing.T) {
@ -224,5 +225,72 @@ func TestKVSEndpoint_ListKeys(t *testing.T) {
if dirent.Keys[2] != "/test/sub/" {
t.Fatalf("Bad: %v", dirent.Keys)
}
}
func TestKVS_Apply_LockDelay(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
// Create and invalidate a session with a lock
state := s1.fsm.State()
if err := state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{
Node: "foo",
LockDelay: 50 * time.Millisecond,
}
if err := state.SessionCreate(2, session); err != nil {
t.Fatalf("err: %v", err)
}
id := session.ID
d := &structs.DirEntry{
Key: "test",
Session: id,
}
if ok, err := state.KVSLock(3, d); err != nil || !ok {
t.Fatalf("err: %v", err)
}
if err := state.SessionDestroy(4, id); err != nil {
t.Fatalf("err: %v", err)
}
// Make a new session that is valid
if err := state.SessionCreate(5, session); err != nil {
t.Fatalf("err: %v", err)
}
validId := session.ID
// Make a lock request
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "test",
Session: validId,
},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if out != false {
t.Fatalf("should not acquire")
}
// Wait for lock-delay
time.Sleep(50 * time.Millisecond)
// Should acquire
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if out != true {
t.Fatalf("should acquire")
}
}

View File

@ -1,13 +1,13 @@
package consul
import (
"errors"
"fmt"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/serf/serf"
"os"
"testing"
"errors"
"time"
)

View File

@ -63,6 +63,7 @@ type MDBTxn struct {
readonly bool
tx *mdb.Txn
dbis map[string]mdb.DBI
after []func()
}
// Abort is used to close the transaction
@ -74,7 +75,19 @@ func (t *MDBTxn) Abort() {
// Commit is used to commit a transaction
func (t *MDBTxn) Commit() error {
return t.tx.Commit()
if err := t.tx.Commit(); err != nil {
return err
}
for _, f := range t.after {
f()
}
t.after = nil
return nil
}
// Defer is used to defer a function call until a successful commit
func (t *MDBTxn) Defer(f func()) {
t.after = append(t.after, f)
}
type IndexFunc func(*MDBIndex, []string) string
@ -734,6 +747,19 @@ func (t *MDBTable) SetLastIndexTxn(tx *MDBTxn, index uint64) error {
return tx.tx.Put(tx.dbis[t.Name], encRowId, encIndex, 0)
}
// SetMaxLastIndexTxn is used to set the last index within a transaction
// if it exceeds the current maximum
func (t *MDBTable) SetMaxLastIndexTxn(tx *MDBTxn, index uint64) error {
current, err := t.LastIndexTxn(tx)
if err != nil {
return err
}
if index > current {
return t.SetLastIndexTxn(tx, index)
}
return nil
}
// StartTxn is used to create a transaction that spans a list of tables
func (t MDBTables) StartTxn(readonly bool) (*MDBTxn, error) {
var tx *MDBTxn

View File

@ -108,6 +108,7 @@ type endpoints struct {
Raft *Raft
Status *Status
KVS *KVS
Session *Session
Internal *Internal
}
@ -316,6 +317,7 @@ func (s *Server) setupRPC(tlsConfig *tls.Config) error {
s.endpoints.Catalog = &Catalog{s}
s.endpoints.Health = &Health{s}
s.endpoints.KVS = &KVS{s}
s.endpoints.Session = &Session{s}
s.endpoints.Internal = &Internal{s}
// Register the handlers
@ -324,6 +326,7 @@ func (s *Server) setupRPC(tlsConfig *tls.Config) error {
s.rpcServer.Register(s.endpoints.Catalog)
s.rpcServer.Register(s.endpoints.Health)
s.rpcServer.Register(s.endpoints.KVS)
s.rpcServer.Register(s.endpoints.Session)
s.rpcServer.Register(s.endpoints.Internal)
list, err := net.ListenTCP("tcp", s.config.RPCAddr)

View File

@ -1,6 +1,7 @@
package consul
import (
"errors"
"fmt"
"github.com/hashicorp/consul/testutil"
"io/ioutil"
@ -8,7 +9,6 @@ import (
"os"
"testing"
"time"
"errors"
)
var nextPort = 15000
@ -293,7 +293,7 @@ func TestServer_JoinLAN_TLS(t *testing.T) {
// Verify Raft has established a peer
testutil.WaitForResult(func() (bool, error) {
return s1.Stats()["raft"]["num_peers"] == "1", nil
return s1.Stats()["raft"]["num_peers"] == "1", nil
}, func(err error) {
t.Fatalf("no peer established")
})

106
consul/session_endpoint.go Normal file
View File

@ -0,0 +1,106 @@
package consul
import (
"fmt"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs"
"time"
)
// Session endpoint is used to manipulate sessions for KV
type Session struct {
srv *Server
}
// Apply is used to apply a modifying request to the data store. This should
// only be used for operations that modify the data
func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
if done, err := s.srv.forward("Session.Apply", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"consul", "session", "apply"}, time.Now())
// Verify the args
if args.Session.ID == "" && args.Op == structs.SessionDestroy {
return fmt.Errorf("Must provide ID")
}
if args.Session.Node == "" && args.Op == structs.SessionCreate {
return fmt.Errorf("Must provide Node")
}
// Apply the update
resp, err := s.srv.raftApply(structs.SessionRequestType, args)
if err != nil {
s.srv.logger.Printf("[ERR] consul.session: Apply failed: %v", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
// Check if the return type is a string
if respString, ok := resp.(string); ok {
*reply = respString
}
return nil
}
// Get is used to retrieve a single session
func (s *Session) Get(args *structs.SessionSpecificRequest,
reply *structs.IndexedSessions) error {
if done, err := s.srv.forward("Session.Get", args, args, reply); done {
return err
}
// Get the local state
state := s.srv.fsm.State()
return s.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("SessionGet"),
func() error {
index, session, err := state.SessionGet(args.Session)
reply.Index = index
if session != nil {
reply.Sessions = structs.Sessions{session}
}
return err
})
}
// List is used to list all the active sessions
func (s *Session) List(args *structs.DCSpecificRequest,
reply *structs.IndexedSessions) error {
if done, err := s.srv.forward("Session.List", args, args, reply); done {
return err
}
// Get the local state
state := s.srv.fsm.State()
return s.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("SessionList"),
func() error {
var err error
reply.Index, reply.Sessions, err = state.SessionList()
return err
})
}
// NodeSessions is used to get all the sessions for a particular node
func (s *Session) NodeSessions(args *structs.NodeSpecificRequest,
reply *structs.IndexedSessions) error {
if done, err := s.srv.forward("Session.NodeSessions", args, args, reply); done {
return err
}
// Get the local state
state := s.srv.fsm.State()
return s.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("NodeSessions"),
func() error {
var err error
reply.Index, reply.Sessions, err = state.NodeSessions(args.Node)
return err
})
}

View File

@ -0,0 +1,212 @@
package consul
import (
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"os"
"testing"
)
func TestSessionEndpoint_Apply(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
arg := structs.SessionRequest{
Datacenter: "dc1",
Op: structs.SessionCreate,
Session: structs.Session{
Node: "foo",
},
}
var out string
if err := client.Call("Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
id := out
// Verify
state := s1.fsm.State()
_, s, err := state.SessionGet(out)
if err != nil {
t.Fatalf("err: %v", err)
}
if s == nil {
t.Fatalf("should not be nil")
}
// Do a delete
arg.Op = structs.SessionDestroy
arg.Session.ID = out
if err := client.Call("Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Verify
_, s, err = state.SessionGet(id)
if err != nil {
t.Fatalf("err: %v", err)
}
if s != nil {
t.Fatalf("bad: %v", s)
}
}
func TestSessionEndpoint_Get(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
arg := structs.SessionRequest{
Datacenter: "dc1",
Op: structs.SessionCreate,
Session: structs.Session{
Node: "foo",
},
}
var out string
if err := client.Call("Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
getR := structs.SessionSpecificRequest{
Datacenter: "dc1",
Session: out,
}
var sessions structs.IndexedSessions
if err := client.Call("Session.Get", &getR, &sessions); err != nil {
t.Fatalf("err: %v", err)
}
if sessions.Index == 0 {
t.Fatalf("Bad: %v", sessions)
}
if len(sessions.Sessions) != 1 {
t.Fatalf("Bad: %v", sessions)
}
s := sessions.Sessions[0]
if s.ID != out {
t.Fatalf("bad: %v", s)
}
}
func TestSessionEndpoint_List(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
ids := []string{}
for i := 0; i < 5; i++ {
arg := structs.SessionRequest{
Datacenter: "dc1",
Op: structs.SessionCreate,
Session: structs.Session{
Node: "foo",
},
}
var out string
if err := client.Call("Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
ids = append(ids, out)
}
getR := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var sessions structs.IndexedSessions
if err := client.Call("Session.List", &getR, &sessions); err != nil {
t.Fatalf("err: %v", err)
}
if sessions.Index == 0 {
t.Fatalf("Bad: %v", sessions)
}
if len(sessions.Sessions) != 5 {
t.Fatalf("Bad: %v", sessions.Sessions)
}
for i := 0; i < len(sessions.Sessions); i++ {
s := sessions.Sessions[i]
if !strContains(ids, s.ID) {
t.Fatalf("bad: %v", s)
}
if s.Node != "foo" {
t.Fatalf("bad: %v", s)
}
}
}
func TestSessionEndpoint_NodeSessions(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureNode(1, structs.Node{"bar", "127.0.0.1"})
ids := []string{}
for i := 0; i < 10; i++ {
arg := structs.SessionRequest{
Datacenter: "dc1",
Op: structs.SessionCreate,
Session: structs.Session{
Node: "bar",
},
}
if i < 5 {
arg.Session.Node = "foo"
}
var out string
if err := client.Call("Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if i < 5 {
ids = append(ids, out)
}
}
getR := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: "foo",
}
var sessions structs.IndexedSessions
if err := client.Call("Session.NodeSessions", &getR, &sessions); err != nil {
t.Fatalf("err: %v", err)
}
if sessions.Index == 0 {
t.Fatalf("Bad: %v", sessions)
}
if len(sessions.Sessions) != 5 {
t.Fatalf("Bad: %v", sessions.Sessions)
}
for i := 0; i < len(sessions.Sessions); i++ {
s := sessions.Sessions[i]
if !strContains(ids, s.ID) {
t.Fatalf("bad: %v", s)
}
if s.Node != "foo" {
t.Fatalf("bad: %v", s)
}
}
}

View File

@ -10,6 +10,8 @@ import (
"os"
"runtime"
"strings"
"sync"
"time"
)
const (
@ -17,10 +19,23 @@ const (
dbServices = "services"
dbChecks = "checks"
dbKVS = "kvs"
dbSessions = "sessions"
dbSessionChecks = "sessionChecks"
dbMaxMapSize32bit uint64 = 512 * 1024 * 1024 // 512MB maximum size
dbMaxMapSize64bit uint64 = 32 * 1024 * 1024 * 1024 // 32GB maximum size
)
// kvMode is used internally to control which type of set
// operation we are performing
type kvMode int
const (
kvSet kvMode = iota
kvCAS
kvLock
kvUnlock
)
// The StateStore is responsible for maintaining all the Consul
// state. It is manipulated by the FSM which maintains consistency
// through the use of Raft. The goals of the StateStore are to provide
@ -29,16 +44,34 @@ const (
// implementation uses the Lightning Memory-Mapped Database (MDB).
// This gives us Multi-Version Concurrency Control for "free"
type StateStore struct {
logger *log.Logger
path string
env *mdb.Env
nodeTable *MDBTable
serviceTable *MDBTable
checkTable *MDBTable
kvsTable *MDBTable
tables MDBTables
watch map[*MDBTable]*NotifyGroup
queryTables map[string]MDBTables
logger *log.Logger
path string
env *mdb.Env
nodeTable *MDBTable
serviceTable *MDBTable
checkTable *MDBTable
kvsTable *MDBTable
sessionTable *MDBTable
sessionCheckTable *MDBTable
tables MDBTables
watch map[*MDBTable]*NotifyGroup
queryTables map[string]MDBTables
// lockDelay is used to mark certain locks as unacquirable.
// When a lock is forcefully released (failing health
// check, destroyed session, etc), it is subject to the LockDelay
// impossed by the session. This prevents another session from
// acquiring the lock for some period of time as a protection against
// split-brains. This is inspired by the lock-delay in Chubby.
// Because this relies on wall-time, we cannot assume all peers
// perceive time as flowing uniformly. This means KVSLock MUST ignore
// lockDelay, since the lockDelay may have expired on the leader,
// but not on the follower. Rejecting the lock could result in
// inconsistencies in the FSMs due to the rate time progresses. Instead,
// only the opinion of the leader is respected, and the Raft log
// is never questioned.
lockDelay map[string]time.Time
lockDelayLock sync.RWMutex
}
// StateSnapshot is used to provide a point-in-time snapshot
@ -49,6 +82,15 @@ type StateSnapshot struct {
lastIndex uint64
}
// sessionCheck is used to create a many-to-one table such
// that each check registered by a session can be mapped back
// to the session row.
type sessionCheck struct {
Node string
CheckID string
Session string
}
// Close is used to abort the transaction and allow for cleanup
func (s *StateSnapshot) Close() error {
s.tx.Abort()
@ -70,10 +112,11 @@ func NewStateStore(logOutput io.Writer) (*StateStore, error) {
}
s := &StateStore{
logger: log.New(logOutput, "", log.LstdFlags),
path: path,
env: env,
watch: make(map[*MDBTable]*NotifyGroup),
logger: log.New(logOutput, "", log.LstdFlags),
path: path,
env: env,
watch: make(map[*MDBTable]*NotifyGroup),
lockDelay: make(map[string]time.Time),
}
// Ensure we can initialize
@ -209,6 +252,10 @@ func (s *StateStore) initialize() error {
Fields: []string{"Key"},
IdxFunc: DefaultIndexPrefixFunc,
},
"session": &MDBIndex{
AllowBlank: true,
Fields: []string{"Session"},
},
},
Decoder: func(buf []byte) interface{} {
out := new(structs.DirEntry)
@ -219,8 +266,47 @@ func (s *StateStore) initialize() error {
},
}
s.sessionTable = &MDBTable{
Name: dbSessions,
Indexes: map[string]*MDBIndex{
"id": &MDBIndex{
Unique: true,
Fields: []string{"ID"},
},
"node": &MDBIndex{
AllowBlank: true,
Fields: []string{"Node"},
},
},
Decoder: func(buf []byte) interface{} {
out := new(structs.Session)
if err := structs.Decode(buf, out); err != nil {
panic(err)
}
return out
},
}
s.sessionCheckTable = &MDBTable{
Name: dbSessionChecks,
Indexes: map[string]*MDBIndex{
"id": &MDBIndex{
Unique: true,
Fields: []string{"Node", "CheckID", "Session"},
},
},
Decoder: func(buf []byte) interface{} {
out := new(sessionCheck)
if err := structs.Decode(buf, out); err != nil {
panic(err)
}
return out
},
}
// Store the set of tables
s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable, s.kvsTable}
s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable,
s.kvsTable, s.sessionTable, s.sessionCheckTable}
for _, table := range s.tables {
table.Env = s.env
table.Encoder = encoder
@ -247,6 +333,9 @@ func (s *StateStore) initialize() error {
"KVSGet": MDBTables{s.kvsTable},
"KVSList": MDBTables{s.kvsTable},
"KVSListKeys": MDBTables{s.kvsTable},
"SessionGet": MDBTables{s.sessionTable},
"SessionList": MDBTables{s.sessionTable},
"NodeSessions": MDBTables{s.sessionTable},
}
return nil
}
@ -278,7 +367,7 @@ func (s *StateStore) EnsureNode(index uint64, node structs.Node) error {
if err := s.nodeTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
defer s.watch[s.nodeTable].Notify()
tx.Defer(func() { s.watch[s.nodeTable].Notify() })
return tx.Commit()
}
@ -311,8 +400,7 @@ func (s *StateStore) Nodes() (uint64, structs.Nodes) {
// EnsureService is used to ensure a given node exposes a service
func (s *StateStore) EnsureService(index uint64, node string, ns *structs.NodeService) error {
tables := MDBTables{s.nodeTable, s.serviceTable}
tx, err := tables.StartTxn(false)
tx, err := s.tables.StartTxn(false)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
@ -343,7 +431,7 @@ func (s *StateStore) EnsureService(index uint64, node string, ns *structs.NodeSe
if err := s.serviceTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
defer s.watch[s.serviceTable].Notify()
tx.Defer(func() { s.watch[s.serviceTable].Notify() })
return tx.Commit()
}
@ -406,8 +494,7 @@ func (s *StateStore) parseNodeServices(tables MDBTables, tx *MDBTxn, name string
// DeleteNodeService is used to delete a node service
func (s *StateStore) DeleteNodeService(index uint64, node, id string) error {
tables := MDBTables{s.serviceTable, s.checkTable}
tx, err := tables.StartTxn(false)
tx, err := s.tables.StartTxn(false)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
@ -419,35 +506,52 @@ func (s *StateStore) DeleteNodeService(index uint64, node, id string) error {
if err := s.serviceTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
defer s.watch[s.serviceTable].Notify()
tx.Defer(func() { s.watch[s.serviceTable].Notify() })
}
// Invalidate any sessions using these checks
checks, err := s.checkTable.GetTxn(tx, "node", node, id)
if err != nil {
return err
}
for _, c := range checks {
check := c.(*structs.HealthCheck)
if err := s.invalidateCheck(index, tx, node, check.CheckID); err != nil {
return err
}
}
if n, err := s.checkTable.DeleteTxn(tx, "node", node, id); err != nil {
return err
} else if n > 0 {
if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
defer s.watch[s.checkTable].Notify()
tx.Defer(func() { s.watch[s.checkTable].Notify() })
}
return tx.Commit()
}
// DeleteNode is used to delete a node and all it's services
func (s *StateStore) DeleteNode(index uint64, node string) error {
tables := MDBTables{s.nodeTable, s.serviceTable, s.checkTable}
tx, err := tables.StartTxn(false)
tx, err := s.tables.StartTxn(false)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
defer tx.Abort()
// Invalidate any sessions held by the node
if err := s.invalidateNode(index, tx, node); err != nil {
return err
}
if n, err := s.serviceTable.DeleteTxn(tx, "id", node); err != nil {
return err
} else if n > 0 {
if err := s.serviceTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
defer s.watch[s.serviceTable].Notify()
tx.Defer(func() { s.watch[s.serviceTable].Notify() })
}
if n, err := s.checkTable.DeleteTxn(tx, "id", node); err != nil {
return err
@ -455,7 +559,7 @@ func (s *StateStore) DeleteNode(index uint64, node string) error {
if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
defer s.watch[s.checkTable].Notify()
tx.Defer(func() { s.watch[s.checkTable].Notify() })
}
if n, err := s.nodeTable.DeleteTxn(tx, "id", node); err != nil {
return err
@ -463,7 +567,7 @@ func (s *StateStore) DeleteNode(index uint64, node string) error {
if err := s.nodeTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
defer s.watch[s.nodeTable].Notify()
tx.Defer(func() { s.watch[s.nodeTable].Notify() })
}
return tx.Commit()
}
@ -578,8 +682,7 @@ func (s *StateStore) EnsureCheck(index uint64, check *structs.HealthCheck) error
}
// Start the txn
tables := MDBTables{s.nodeTable, s.serviceTable, s.checkTable}
tx, err := tables.StartTxn(false)
tx, err := s.tables.StartTxn(false)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
@ -608,6 +711,14 @@ func (s *StateStore) EnsureCheck(index uint64, check *structs.HealthCheck) error
check.ServiceName = srv.ServiceName
}
// Invalidate any sessions if status is critical
if check.Status == structs.HealthCritical {
err := s.invalidateCheck(index, tx, check.Node, check.CheckID)
if err != nil {
return err
}
}
// Ensure the check is set
if err := s.checkTable.InsertTxn(tx, check); err != nil {
return err
@ -615,25 +726,30 @@ func (s *StateStore) EnsureCheck(index uint64, check *structs.HealthCheck) error
if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
defer s.watch[s.checkTable].Notify()
tx.Defer(func() { s.watch[s.checkTable].Notify() })
return tx.Commit()
}
// DeleteNodeCheck is used to delete a node health check
func (s *StateStore) DeleteNodeCheck(index uint64, node, id string) error {
tx, err := s.checkTable.StartTxn(false, nil)
tx, err := s.tables.StartTxn(false)
if err != nil {
return err
}
defer tx.Abort()
// Invalidate any sessions held by this check
if err := s.invalidateCheck(index, tx, node, id); err != nil {
return err
}
if n, err := s.checkTable.DeleteTxn(tx, "id", node, id); err != nil {
return err
} else if n > 0 {
if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
defer s.watch[s.checkTable].Notify()
tx.Defer(func() { s.watch[s.checkTable].Notify() })
}
return tx.Commit()
}
@ -837,35 +953,8 @@ func (s *StateStore) parseNodeInfo(tx *MDBTxn, res []interface{}, err error) str
// KVSSet is used to create or update a KV entry
func (s *StateStore) KVSSet(index uint64, d *structs.DirEntry) error {
// Start a new txn
tx, err := s.kvsTable.StartTxn(false, nil)
if err != nil {
return err
}
defer tx.Abort()
// Get the existing node
res, err := s.kvsTable.GetTxn(tx, "id", d.Key)
if err != nil {
return err
}
// Set the create and modify times
if len(res) == 0 {
d.CreateIndex = index
} else {
d.CreateIndex = res[0].(*structs.DirEntry).CreateIndex
}
d.ModifyIndex = index
if err := s.kvsTable.InsertTxn(tx, d); err != nil {
return err
}
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
defer s.watch[s.kvsTable].Notify()
return tx.Commit()
_, err := s.kvsSet(index, d, kvSet)
return err
}
// KVSRestore is used to restore a DirEntry. It should only be used when
@ -986,15 +1075,44 @@ func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts .
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
defer s.watch[s.kvsTable].Notify()
tx.Defer(func() { s.watch[s.kvsTable].Notify() })
}
return tx.Commit()
}
// KVSCheckAndSet is used to perform an atomic check-and-set
func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, error) {
return s.kvsSet(index, d, kvCAS)
}
// KVSLock works like KVSSet but only writes if the lock can be acquired
func (s *StateStore) KVSLock(index uint64, d *structs.DirEntry) (bool, error) {
return s.kvsSet(index, d, kvLock)
}
// KVSUnlock works like KVSSet but only writes if the lock can be unlocked
func (s *StateStore) KVSUnlock(index uint64, d *structs.DirEntry) (bool, error) {
return s.kvsSet(index, d, kvUnlock)
}
// KVSLockDelay returns the expiration time of a key lock delay. A key may
// have a lock delay if it was unlocked due to a session invalidation instead
// of a graceful unlock. This must be checked on the leader node, and not in
// KVSLock due to the variability of clocks.
func (s *StateStore) KVSLockDelay(key string) time.Time {
s.lockDelayLock.RLock()
expires := s.lockDelay[key]
s.lockDelayLock.RUnlock()
return expires
}
// kvsSet is the internal setter
func (s *StateStore) kvsSet(
index uint64,
d *structs.DirEntry,
mode kvMode) (bool, error) {
// Start a new txn
tx, err := s.kvsTable.StartTxn(false, nil)
tx, err := s.tables.StartTxn(false)
if err != nil {
return false, err
}
@ -1015,10 +1133,51 @@ func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, er
// Use the ModifyIndex as the constraint. A modify of time of 0
// means we are doing a set-if-not-exists, while any other value
// means we expect that modify time.
if d.ModifyIndex == 0 && exist != nil {
return false, nil
} else if d.ModifyIndex > 0 && (exist == nil || exist.ModifyIndex != d.ModifyIndex) {
return false, nil
if mode == kvCAS {
if d.ModifyIndex == 0 && exist != nil {
return false, nil
} else if d.ModifyIndex > 0 && (exist == nil || exist.ModifyIndex != d.ModifyIndex) {
return false, nil
}
}
// If attempting to lock, check this is possible
if mode == kvLock {
// Verify we have a session
if d.Session == "" {
return false, fmt.Errorf("Missing session")
}
// Bail if it is already locked
if exist != nil && exist.Session != "" {
return false, nil
}
// Verify the session exists
res, err := s.sessionTable.GetTxn(tx, "id", d.Session)
if err != nil {
return false, err
}
if len(res) == 0 {
return false, fmt.Errorf("Invalid session")
}
// Update the lock index
if exist != nil {
exist.LockIndex++
exist.Session = d.Session
} else {
d.LockIndex = 1
}
}
// If attempting to unlock, verify the key exists and is held
if mode == kvUnlock {
if exist == nil || exist.Session != d.Session {
return false, nil
}
// Clear the session to unlock
exist.Session = ""
}
// Set the create and modify times
@ -1026,6 +1185,9 @@ func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, er
d.CreateIndex = index
} else {
d.CreateIndex = exist.CreateIndex
d.LockIndex = exist.LockIndex
d.Session = exist.Session
}
d.ModifyIndex = index
@ -1035,10 +1197,281 @@ func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, er
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
return false, err
}
defer s.watch[s.kvsTable].Notify()
tx.Defer(func() { s.watch[s.kvsTable].Notify() })
return true, tx.Commit()
}
// SessionCreate is used to create a new session. The
// ID will be populated on a successful return
func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error {
// Assign the create index
session.CreateIndex = index
// Start the transaction
tx, err := s.tables.StartTxn(false)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
defer tx.Abort()
// Verify that the node exists
res, err := s.nodeTable.GetTxn(tx, "id", session.Node)
if err != nil {
return err
}
if len(res) == 0 {
return fmt.Errorf("Missing node registration")
}
// Verify that the checks exist and are not critical
for _, checkId := range session.Checks {
res, err := s.checkTable.GetTxn(tx, "id", session.Node, checkId)
if err != nil {
return err
}
if len(res) == 0 {
return fmt.Errorf("Missing check '%s' registration", checkId)
}
chk := res[0].(*structs.HealthCheck)
if chk.Status == structs.HealthCritical {
return fmt.Errorf("Check '%s' is in %s state", checkId, chk.Status)
}
}
// Generate a new session ID, verify uniqueness
session.ID = generateUUID()
for {
res, err = s.sessionTable.GetTxn(tx, "id", session.ID)
if err != nil {
return err
}
// Quit if this ID is unique
if len(res) == 0 {
break
}
}
// Insert the session
if err := s.sessionTable.InsertTxn(tx, session); err != nil {
return err
}
// Insert the check mappings
sCheck := sessionCheck{Node: session.Node, Session: session.ID}
for _, checkID := range session.Checks {
sCheck.CheckID = checkID
if err := s.sessionCheckTable.InsertTxn(tx, &sCheck); err != nil {
return err
}
}
// Trigger the update notifications
if err := s.sessionTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
tx.Defer(func() { s.watch[s.sessionTable].Notify() })
return tx.Commit()
}
// SessionRestore is used to restore a session. It should only be used when
// doing a restore, otherwise SessionCreate should be used.
func (s *StateStore) SessionRestore(session *structs.Session) error {
// Start the transaction
tx, err := s.tables.StartTxn(false)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
defer tx.Abort()
// Insert the session
if err := s.sessionTable.InsertTxn(tx, session); err != nil {
return err
}
// Insert the check mappings
sCheck := sessionCheck{Node: session.Node, Session: session.ID}
for _, checkID := range session.Checks {
sCheck.CheckID = checkID
if err := s.sessionCheckTable.InsertTxn(tx, &sCheck); err != nil {
return err
}
}
// Trigger the update notifications
index := session.CreateIndex
if err := s.sessionTable.SetMaxLastIndexTxn(tx, index); err != nil {
return err
}
tx.Defer(func() { s.watch[s.sessionTable].Notify() })
return tx.Commit()
}
// SessionGet is used to get a session entry
func (s *StateStore) SessionGet(id string) (uint64, *structs.Session, error) {
idx, res, err := s.sessionTable.Get("id", id)
var d *structs.Session
if len(res) > 0 {
d = res[0].(*structs.Session)
}
return idx, d, err
}
// SessionList is used to list all the open sessions
func (s *StateStore) SessionList() (uint64, []*structs.Session, error) {
idx, res, err := s.sessionTable.Get("id")
out := make([]*structs.Session, len(res))
for i, raw := range res {
out[i] = raw.(*structs.Session)
}
return idx, out, err
}
// NodeSessions is used to list all the open sessions for a node
func (s *StateStore) NodeSessions(node string) (uint64, []*structs.Session, error) {
idx, res, err := s.sessionTable.Get("node", node)
out := make([]*structs.Session, len(res))
for i, raw := range res {
out[i] = raw.(*structs.Session)
}
return idx, out, err
}
// SessionDelete is used to destroy a session.
func (s *StateStore) SessionDestroy(index uint64, id string) error {
tx, err := s.tables.StartTxn(false)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
defer tx.Abort()
if err := s.invalidateSession(index, tx, id); err != nil {
return err
}
return tx.Commit()
}
// invalideNode is used to invalide all sessions belonging to a node
// All tables should be locked in the tx.
func (s *StateStore) invalidateNode(index uint64, tx *MDBTxn, node string) error {
sessions, err := s.sessionTable.GetTxn(tx, "node", node)
if err != nil {
return err
}
for _, sess := range sessions {
session := sess.(*structs.Session).ID
if err := s.invalidateSession(index, tx, session); err != nil {
return err
}
}
return nil
}
// invalidateCheck is used to invalide all sessions belonging to a check
// All tables should be locked in the tx.
func (s *StateStore) invalidateCheck(index uint64, tx *MDBTxn, node, check string) error {
sessionChecks, err := s.sessionCheckTable.GetTxn(tx, "id", node, check)
if err != nil {
return err
}
for _, sc := range sessionChecks {
session := sc.(*sessionCheck).Session
if err := s.invalidateSession(index, tx, session); err != nil {
return err
}
}
return nil
}
// invalidateSession is used to invalide a session within a given txn
// All tables should be locked in the tx.
func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) error {
// Get the session
res, err := s.sessionTable.GetTxn(tx, "id", id)
if err != nil {
return err
}
// Quit if this session does not exist
if len(res) == 0 {
return nil
}
session := res[0].(*structs.Session)
// Enforce the MaxLockDelay
delay := session.LockDelay
if delay > structs.MaxLockDelay {
delay = structs.MaxLockDelay
}
// Invalidate any held locks
if err := s.invalidateLocks(index, tx, delay, id); err != nil {
return err
}
// Nuke the session
if _, err := s.sessionTable.DeleteTxn(tx, "id", id); err != nil {
return err
}
// Delete the check mappings
for _, checkID := range session.Checks {
if _, err := s.sessionCheckTable.DeleteTxn(tx, "id",
session.Node, checkID, id); err != nil {
return err
}
}
// Trigger the update notifications
if err := s.sessionTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
tx.Defer(func() { s.watch[s.sessionTable].Notify() })
return nil
}
// invalidateLocks is used to invalidate all the locks held by a session
// within a given txn. All tables should be locked in the tx.
func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn,
lockDelay time.Duration, id string) error {
pairs, err := s.kvsTable.GetTxn(tx, "session", id)
if err != nil {
return err
}
var expires time.Time
if lockDelay > 0 {
s.lockDelayLock.Lock()
defer s.lockDelayLock.Unlock()
expires = time.Now().Add(lockDelay)
}
for _, pair := range pairs {
kv := pair.(*structs.DirEntry)
kv.Session = "" // Clear the lock
kv.ModifyIndex = index // Update the modified time
if err := s.kvsTable.InsertTxn(tx, kv); err != nil {
return err
}
// If there is a lock delay, prevent acquisition
// for at least lockDelay period
if lockDelay > 0 {
s.lockDelay[kv.Key] = expires
time.AfterFunc(lockDelay, func() {
s.lockDelayLock.Lock()
delete(s.lockDelay, kv.Key)
s.lockDelayLock.Unlock()
})
}
}
if len(pairs) > 0 {
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
tx.Defer(func() { s.watch[s.kvsTable].Notify() })
}
return nil
}
// Snapshot is used to create a point in time snapshot
func (s *StateStore) Snapshot() (*StateSnapshot, error) {
// Begin a new txn on all tables
@ -1102,3 +1535,13 @@ func (s *StateSnapshot) NodeChecks(node string) structs.HealthChecks {
func (s *StateSnapshot) KVSDump(stream chan<- interface{}) error {
return s.store.kvsTable.StreamTxn(stream, s.tx, "id")
}
// SessionList is used to list all the open sessions
func (s *StateSnapshot) SessionList() ([]*structs.Session, error) {
res, err := s.store.sessionTable.GetTxn(s.tx, "id")
out := make([]*structs.Session, len(res))
for i, raw := range res {
out[i] = raw.(*structs.Session)
}
return out, err
}

View File

@ -6,6 +6,7 @@ import (
"reflect"
"sort"
"testing"
"time"
)
func testStateStore() (*StateStore, error) {
@ -636,6 +637,21 @@ func TestStoreSnapshot(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Add some sessions
session := &structs.Session{Node: "foo"}
if err := store.SessionCreate(16, session); err != nil {
t.Fatalf("err: %v", err)
}
session = &structs.Session{Node: "bar"}
if err := store.SessionCreate(17, session); err != nil {
t.Fatalf("err: %v", err)
}
d.Session = session.ID
if ok, err := store.KVSLock(18, d); err != nil || !ok {
t.Fatalf("err: %v", err)
}
// Take a snapshot
snap, err := store.Snapshot()
if err != nil {
@ -644,7 +660,7 @@ func TestStoreSnapshot(t *testing.T) {
defer snap.Close()
// Check the last nodes
if idx := snap.LastIndex(); idx != 15 {
if idx := snap.LastIndex(); idx != 18 {
t.Fatalf("bad: %v", idx)
}
@ -699,14 +715,23 @@ func TestStoreSnapshot(t *testing.T) {
t.Fatalf("missing KVS entries!")
}
// Check there are 2 sessions
sessions, err := snap.SessionList()
if err != nil {
t.Fatalf("err: %v", err)
}
if len(sessions) != 2 {
t.Fatalf("missing sessions")
}
// Make some changes!
if err := store.EnsureService(14, "foo", &structs.NodeService{"db", "db", []string{"slave"}, 8000}); err != nil {
if err := store.EnsureService(19, "foo", &structs.NodeService{"db", "db", []string{"slave"}, 8000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService(15, "bar", &structs.NodeService{"db", "db", []string{"master"}, 8000}); err != nil {
if err := store.EnsureService(20, "bar", &structs.NodeService{"db", "db", []string{"master"}, 8000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureNode(16, structs.Node{"baz", "127.0.0.3"}); err != nil {
if err := store.EnsureNode(21, structs.Node{"baz", "127.0.0.3"}); err != nil {
t.Fatalf("err: %v", err)
}
checkAfter := &structs.HealthCheck{
@ -716,12 +741,12 @@ func TestStoreSnapshot(t *testing.T) {
Status: structs.HealthCritical,
ServiceID: "db",
}
if err := store.EnsureCheck(17, checkAfter); err != nil {
t.Fatalf("err: %v")
if err := store.EnsureCheck(22, checkAfter); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.KVSDelete(18, "/web/a"); err != nil {
t.Fatalf("err: %v")
if err := store.KVSDelete(23, "/web/b"); err != nil {
t.Fatalf("err: %v", err)
}
// Check snapshot has old values
@ -773,6 +798,15 @@ func TestStoreSnapshot(t *testing.T) {
if len(ents) != 2 {
t.Fatalf("missing KVS entries!")
}
// Check there are 2 sessions
sessions, err = snap.SessionList()
if err != nil {
t.Fatalf("err: %v", err)
}
if len(sessions) != 2 {
t.Fatalf("missing sessions")
}
}
func TestEnsureCheck(t *testing.T) {
@ -1561,3 +1595,511 @@ func TestKVSDeleteTree(t *testing.T) {
t.Fatalf("bad: %v", ents)
}
}
func TestSessionCreate(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
check := &structs.HealthCheck{
Node: "foo",
CheckID: "bar",
Status: structs.HealthPassing,
}
if err := store.EnsureCheck(13, check); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{
Node: "foo",
Checks: []string{"bar"},
}
if err := store.SessionCreate(1000, session); err != nil {
t.Fatalf("err: %v", err)
}
if session.ID == "" {
t.Fatalf("bad: %v", session)
}
if session.CreateIndex != 1000 {
t.Fatalf("bad: %v", session)
}
}
func TestSessionCreate_Invalid(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
// No node registered
session := &structs.Session{
Node: "foo",
Checks: []string{"bar"},
}
if err := store.SessionCreate(1000, session); err.Error() != "Missing node registration" {
t.Fatalf("err: %v", err)
}
// Check not registered
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
if err := store.SessionCreate(1000, session); err.Error() != "Missing check 'bar' registration" {
t.Fatalf("err: %v", err)
}
// Unhealthy check
check := &structs.HealthCheck{
Node: "foo",
CheckID: "bar",
Status: structs.HealthCritical,
}
if err := store.EnsureCheck(13, check); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.SessionCreate(1000, session); err.Error() != "Check 'bar' is in critical state" {
t.Fatalf("err: %v", err)
}
}
func TestSession_Lookups(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
// Create a session
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{
Node: "foo",
}
if err := store.SessionCreate(1000, session); err != nil {
t.Fatalf("err: %v", err)
}
// Lookup by ID
idx, s2, err := store.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 1000 {
t.Fatalf("bad: %v", idx)
}
if !reflect.DeepEqual(s2, session) {
t.Fatalf("bad: %v", s2)
}
// Create many sessions
ids := []string{session.ID}
for i := 0; i < 10; i++ {
session := &structs.Session{
Node: "foo",
}
if err := store.SessionCreate(uint64(1000+i), session); err != nil {
t.Fatalf("err: %v", err)
}
ids = append(ids, session.ID)
}
// List all
idx, all, err := store.SessionList()
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 1009 {
t.Fatalf("bad: %v", idx)
}
// Retrieve the ids
var out []string
for _, s := range all {
out = append(out, s.ID)
}
sort.Strings(ids)
sort.Strings(out)
if !reflect.DeepEqual(ids, out) {
t.Fatalf("bad: %v %v", ids, out)
}
// List by node
idx, nodes, err := store.NodeSessions("foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 1009 {
t.Fatalf("bad: %v", idx)
}
// Check again for the node list
out = nil
for _, s := range nodes {
out = append(out, s.ID)
}
sort.Strings(out)
if !reflect.DeepEqual(ids, out) {
t.Fatalf("bad: %v %v", ids, out)
}
}
func TestSessionInvalidate_CriticalHealthCheck(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
check := &structs.HealthCheck{
Node: "foo",
CheckID: "bar",
Status: structs.HealthPassing,
}
if err := store.EnsureCheck(13, check); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{
Node: "foo",
Checks: []string{"bar"},
}
if err := store.SessionCreate(14, session); err != nil {
t.Fatalf("err: %v", err)
}
// Invalidate the check
check.Status = structs.HealthCritical
if err := store.EnsureCheck(15, check); err != nil {
t.Fatalf("err: %v", err)
}
// Lookup by ID, should be nil
_, s2, err := store.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
}
func TestSessionInvalidate_DeleteHealthCheck(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
check := &structs.HealthCheck{
Node: "foo",
CheckID: "bar",
Status: structs.HealthPassing,
}
if err := store.EnsureCheck(13, check); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{
Node: "foo",
Checks: []string{"bar"},
}
if err := store.SessionCreate(14, session); err != nil {
t.Fatalf("err: %v", err)
}
// Delete the check
if err := store.DeleteNodeCheck(15, "foo", "bar"); err != nil {
t.Fatalf("err: %v", err)
}
// Lookup by ID, should be nil
_, s2, err := store.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
}
func TestSessionInvalidate_DeleteNode(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{
Node: "foo",
}
if err := store.SessionCreate(14, session); err != nil {
t.Fatalf("err: %v", err)
}
// Delete the node
if err := store.DeleteNode(15, "foo"); err != nil {
t.Fatalf("err: %v")
}
// Lookup by ID, should be nil
_, s2, err := store.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
}
func TestSessionInvalidate_DeleteNodeService(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode(11, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService(12, "foo", &structs.NodeService{"api", "api", nil, 5000}); err != nil {
t.Fatalf("err: %v", err)
}
check := &structs.HealthCheck{
Node: "foo",
CheckID: "api",
Name: "Can connect",
Status: structs.HealthPassing,
ServiceID: "api",
}
if err := store.EnsureCheck(13, check); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{
Node: "foo",
Checks: []string{"api"},
}
if err := store.SessionCreate(14, session); err != nil {
t.Fatalf("err: %v", err)
}
// Should invalidate the session
if err := store.DeleteNodeService(15, "foo", "api"); err != nil {
t.Fatalf("err: %v", err)
}
// Lookup by ID, should be nil
_, s2, err := store.SessionGet(session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s2 != nil {
t.Fatalf("session should be invalidated")
}
}
func TestKVSLock(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{Node: "foo"}
if err := store.SessionCreate(4, session); err != nil {
t.Fatalf("err: %v", err)
}
// Lock with a non-existing keys should work
d := &structs.DirEntry{
Key: "/foo",
Flags: 42,
Value: []byte("test"),
Session: session.ID,
}
ok, err := store.KVSLock(5, d)
if err != nil {
t.Fatalf("err: %v", err)
}
if !ok {
t.Fatalf("unexpected fail")
}
if d.LockIndex != 1 {
t.Fatalf("bad: %v", d)
}
// Re-locking should fail
ok, err = store.KVSLock(6, d)
if err != nil {
t.Fatalf("err: %v", err)
}
if ok {
t.Fatalf("expected fail")
}
// Set a normal key
k1 := &structs.DirEntry{
Key: "/bar",
Flags: 0,
Value: []byte("asdf"),
}
if err := store.KVSSet(7, k1); err != nil {
t.Fatalf("err: %v", err)
}
// Should acquire the lock
k1.Session = session.ID
ok, err = store.KVSLock(8, k1)
if err != nil {
t.Fatalf("err: %v", err)
}
if !ok {
t.Fatalf("unexpected fail")
}
// Re-acquire should fail
ok, err = store.KVSLock(9, k1)
if err != nil {
t.Fatalf("err: %v", err)
}
if ok {
t.Fatalf("expected fail")
}
}
func TestKVSUnlock(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{Node: "foo"}
if err := store.SessionCreate(4, session); err != nil {
t.Fatalf("err: %v", err)
}
// Unlock with a non-existing keys should fail
d := &structs.DirEntry{
Key: "/foo",
Flags: 42,
Value: []byte("test"),
Session: session.ID,
}
ok, err := store.KVSUnlock(5, d)
if err != nil {
t.Fatalf("err: %v", err)
}
if ok {
t.Fatalf("expected fail")
}
// Lock should work
d.Session = session.ID
if ok, _ := store.KVSLock(6, d); !ok {
t.Fatalf("expected lock")
}
// Unlock should work
d.Session = session.ID
ok, err = store.KVSUnlock(7, d)
if err != nil {
t.Fatalf("err: %v", err)
}
if !ok {
t.Fatalf("unexpected fail")
}
// Re-lock should work
d.Session = session.ID
if ok, err := store.KVSLock(8, d); err != nil {
t.Fatalf("err: %v", err)
} else if !ok {
t.Fatalf("expected lock")
}
if d.LockIndex != 2 {
t.Fatalf("bad: %v", d)
}
}
func TestSessionInvalidate_KeyUnlock(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{Node: "foo", LockDelay: 50 * time.Millisecond}
if err := store.SessionCreate(4, session); err != nil {
t.Fatalf("err: %v", err)
}
// Lock a key with the session
d := &structs.DirEntry{
Key: "/foo",
Flags: 42,
Value: []byte("test"),
Session: session.ID,
}
ok, err := store.KVSLock(5, d)
if err != nil {
t.Fatalf("err: %v", err)
}
if !ok {
t.Fatalf("unexpected fail")
}
// Delete the node
if err := store.DeleteNode(6, "foo"); err != nil {
t.Fatalf("err: %v")
}
// Key should be unlocked
idx, d2, err := store.KVSGet("/foo")
if idx != 6 {
t.Fatalf("bad: %v", idx)
}
if d2.LockIndex != 1 {
t.Fatalf("bad: %v", *d2)
}
if d2.Session != "" {
t.Fatalf("bad: %v", *d2)
}
// Key should have a lock delay
expires := store.KVSLockDelay("/foo")
if expires.Before(time.Now().Add(30 * time.Millisecond)) {
t.Fatalf("Bad: %v", expires)
}
}

View File

@ -19,6 +19,7 @@ const (
RegisterRequestType MessageType = iota
DeregisterRequestType
KVSRequestType
SessionRequestType
)
const (
@ -28,6 +29,12 @@ const (
HealthCritical = "critical"
)
const (
// MaxLockDelay provides a maximum LockDelay value for
// a session. Any value above this will not be respected.
MaxLockDelay = 60 * time.Second
)
// RPCInfo is used to describe common information about query
type RPCInfo interface {
RequestDatacenter() string
@ -275,9 +282,11 @@ type IndexedNodeDump struct {
type DirEntry struct {
CreateIndex uint64
ModifyIndex uint64
LockIndex uint64
Key string
Flags uint64
Value []byte
Session string `json:",omitempty"`
}
type DirEntries []*DirEntry
@ -287,7 +296,9 @@ const (
KVSSet KVSOp = "set"
KVSDelete = "delete"
KVSDeleteTree = "delete-tree"
KVSCAS = "cas" // Check-and-set
KVSCAS = "cas" // Check-and-set
KVSLock = "lock" // Lock a key
KVSUnlock = "unlock" // Unlock a key
)
// KVSRequest is used to operate on the Key-Value store
@ -335,6 +346,52 @@ type IndexedKeyList struct {
QueryMeta
}
// Session is used to represent an open session in the KV store.
// This issued to associate node checks with acquired locks.
type Session struct {
CreateIndex uint64
ID string
Node string
Checks []string
LockDelay time.Duration
}
type Sessions []*Session
type SessionOp string
const (
SessionCreate SessionOp = "create"
SessionDestroy = "destroy"
)
// SessionRequest is used to operate on sessions
type SessionRequest struct {
Datacenter string
Op SessionOp // Which operation are we performing
Session Session // Which session
WriteRequest
}
func (r *SessionRequest) RequestDatacenter() string {
return r.Datacenter
}
// SessionSpecificRequest is used to request a session by ID
type SessionSpecificRequest struct {
Datacenter string
Session string
QueryOptions
}
func (r *SessionSpecificRequest) RequestDatacenter() string {
return r.Datacenter
}
type IndexedSessions struct {
Sessions Sessions
QueryMeta
}
// Decode is used to decode a MsgPack encoded object
func Decode(buf []byte, out interface{}) error {
var handle codec.MsgpackHandle

View File

@ -1,6 +1,7 @@
package consul
import (
crand "crypto/rand"
"encoding/binary"
"fmt"
"github.com/hashicorp/serf/serf"
@ -160,3 +161,18 @@ func runtimeStats() map[string]string {
"cpu_count": strconv.FormatInt(int64(runtime.NumCPU()), 10),
}
}
// generateUUID is used to generate a random UUID
func generateUUID() string {
buf := make([]byte, 16)
if _, err := crand.Read(buf); err != nil {
panic(fmt.Errorf("failed to read random bytes: %v", err))
}
return fmt.Sprintf("%08x-%04x-%04x-%04x-%12x",
buf[0:4],
buf[4:6],
buf[6:8],
buf[8:10],
buf[10:16])
}

View File

@ -2,6 +2,7 @@ package consul
import (
"github.com/hashicorp/serf/serf"
"regexp"
"testing"
)
@ -75,3 +76,19 @@ func TestByteConversion(t *testing.T) {
t.Fatalf("no match")
}
}
func TestGenerateUUID(t *testing.T) {
prev := generateUUID()
for i := 0; i < 100; i++ {
id := generateUUID()
if prev == id {
t.Fatalf("Should get a new ID!")
}
matched, err := regexp.MatchString(
"[\\da-f]{8}-[\\da-f]{4}-[\\da-f]{4}-[\\da-f]{4}-[\\da-f]{12}", id)
if !matched || err != nil {
t.Fatalf("expected match %s %v %s", id, matched, err)
}
}
}

View File

@ -1,9 +1,9 @@
package testutil
import (
"time"
"testing"
"github.com/hashicorp/consul/consul/structs"
"testing"
"time"
)
type testFn func() (bool, error)
@ -27,7 +27,7 @@ func WaitForResult(test testFn, error errorFn) {
}
}
type rpcFn func(string, interface {}, interface {}) error
type rpcFn func(string, interface{}, interface{}) error
func WaitForLeader(t *testing.T, rpc rpcFn, dc string) structs.IndexedNodes {
var out structs.IndexedNodes

View File

@ -10,12 +10,13 @@ The main interface to Consul is a RESTful HTTP API. The API can be
used for CRUD for nodes, services, checks, and configuration. The endpoints are
versioned to enable changes without breaking backwards compatibility.
All endpoints fall into one of 6 categories:
All endpoints fall into one of several categories:
* kv - Key/Value store
* agent - Agent control
* catalog - Manages nodes and services
* health - Manages health checks
* session - Session manipulation
* status - Consul system status
* internal - Internal APIs. Purposely undocumented, subject to change.
@ -94,6 +95,8 @@ By default the datacenter of the agent is queried, however the dc can
be provided using the "?dc=" query parameter. If a client wants to write
to all Datacenters, one request per datacenter must be made.
### GET Method
When using the `GET` method, Consul will return the specified key,
or if the "?recurse" query parameter is provided, it will return
all keys with the given prefix.
@ -104,9 +107,11 @@ Each object will look like:
{
"CreateIndex": 100,
"ModifyIndex": 200,
"LockIndex": 200,
"Key": "zip",
"Flags": 0,
"Value": "dGVzdA=="
"Value": "dGVzdA==",
"Session": "adf4238a-882b-9ddc-4a9d-5b6758e4159e"
}
]
@ -116,37 +121,14 @@ that modified this key. This index corresponds to the `X-Consul-Index`
header value that is returned. A blocking query can be used to wait for
a value to change. If "?recurse" is used, the `X-Consul-Index` corresponds
to the latest `ModifyIndex` and so a blocking query waits until any of the
listed keys are updated. The multiple consistency modes can be used for
`GET` requests as well.
listed keys are updated. The `LockIndex` is the last index of a successful
lock acquisition. If the lock is held, the `Session` key provides the
session that owns the lock.
The `Key` is simply the full path of the entry. `Flags` are an opaque
unsigned integer that can be attached to each entry. The use of this is
left totally to the user. Lastly, the `Value` is a base64 key value.
If no entries are found, a 404 code is returned.
When using the `PUT` method, Consul expects the request body to be the
value corresponding to the key. There are a number of parameters that can
be used with a PUT request:
* ?flags=\<num\> : This can be used to specify an unsigned value between
0 and 2^64-1. It is opaque to the user, but a client application may
use it.
* ?cas=\<index\> : This flag is used to turn the `PUT` into a **Check-And-Set**
operation. This is very useful as it allows clients to build more complex
syncronization primitives on top. If the index is 0, then Consul will only
put the key if it does not already exist. If the index is non-zero, then
the key is only set if the index matches the `ModifyIndex` of that key.
The return value is simply either `true` or `false`. If the CAS check fails,
then `false` will be returned.
Lastly, the `DELETE` method can be used to delete a single key or all
keys sharing a prefix. If the "?recurse" query parameter is provided,
then all keys with the prefix are deleted, otherwise only the specified
key.
It is possible to also only list keys without any values by using the
"?keys" query parameter along with a `GET` request. This will return
a list of the keys under the given prefix. The optional "?separator="
@ -163,6 +145,47 @@ For example, listing "/web/" with a "/" seperator may return:
Using the key listing method may be suitable when you do not need
the values or flags, or want to implement a key-space explorer.
If no entries are found, a 404 code is returned.
This endpoint supports blocking queries and all consistency modes.
### PUT method
When using the `PUT` method, Consul expects the request body to be the
value corresponding to the key. There are a number of parameters that can
be used with a PUT request:
* ?flags=\<num\> : This can be used to specify an unsigned value between
0 and 2^64-1. It is opaque to the user, but a client application may
use it.
* ?cas=\<index\> : This flag is used to turn the `PUT` into a Check-And-Set
operation. This is very useful as it allows clients to build more complex
syncronization primitives on top. If the index is 0, then Consul will only
put the key if it does not already exist. If the index is non-zero, then
the key is only set if the index matches the `ModifyIndex` of that key.
* ?acquire=\<session\> : This flag is used to turn the `PUT` into a lock acquisition
operation. This is useful as it allows leader election to be built on top
of Consul. If the lock is not held and the session is valid, this increments
the `LockIndex` and sets the `Session` value of the key in addition to updating
the key contents. A key does not need to exist to be acquired.
* ?release=\<session\> : This flag is used to turn the `PUT` into a lock release
operation. This is useful when paired with "?acquire=" as it allows clients to
yield a lock. This will leave the `LockIndex` unmodified but will clear the associated
`Session` of the key. The key must be held by this session to be unlocked.
The return value is simply either `true` or `false`. If `false` is returned,
then the update has not taken place.
### DELETE method
Lastly, the `DELETE` method can be used to delete a single key or all
keys sharing a prefix. If the "?recurse" query parameter is provided,
then all keys with the prefix are deleted, otherwise only the specified
key.
## Agent
The Agent endpoints are used to interact with a local Consul agent. Usually,
@ -805,6 +828,137 @@ It returns a JSON body like this:
This endpoint supports blocking queries and all consistency modes.
## Session
The Session endpoints are used to create, destroy and query sessions.
The following endpoints are supported:
* /v1/session/create: Creates a new session
* /v1/session/destroy/\<session\>: Destroys a given session
* /v1/session/info/\<session\>: Queries a given session
* /v1/session/node/\<node\>: Lists sessions belonging to a node
* /v1/session/list: Lists all the active sessions
All of the read session endpoints supports blocking queries and all consistency modes.
### /v1/session/create
The create endpoint is used to initialize a new session.
There is more documentation on sessions [here](/docs/internals/sessions.html).
Sessions must be associated with a node, and optionally any number of checks.
By default, the agent uses it's own node name, and provides the "serfHealth"
check, along with a 15 second lock delay.
By default, the agent's local datacenter is used, but another datacenter
can be specified using the "?dc=" query parameter. It is not recommended
to use cross-region sessions.
The create endpoint expects a JSON request body to be PUT. The request
body must look like:
{
"LockDelay": "15s",
"Node": "foobar",
"Checks": ["a", "b", "c"]
}
None of the fields are mandatory, and in fact no body needs to be PUT
if the defaults are to be used. The `LockDelay` field can be specified
as a duration string using a "s" suffix for seconds. It can also be a numeric
value. Small values are treated as seconds, and otherwise it is provided with
nanosecond granularity.
The `Node` field must refer to a node that is already registered. By default,
the agent will use it's own name. Lastly, the `Checks` field is used to provide
a list of associated health checks. By default the "serfHealth" check is provided.
It is highly recommended that if you override this list, you include that check.
The return code is 200 on success, along with a body like:
{"ID":"adf4238a-882b-9ddc-4a9d-5b6758e4159e"}
This is used to provide the ID of the newly created session.
### /v1/session/destroy/\<session\>
The destroy endpoint is hit with a PUT and destroys the given session.
By default the local datacenter is used, but the "?dc=" query parameter
can be used to specify the datacenter. The session being destroyed must
be provided after the slash.
The return code is 200 on success.
### /v1/session/info/\<session\>
This endpoint is hit with a GET and returns the session information
by ID within a given datacenter. By default the datacenter of the agent is queried,
however the dc can be provided using the "?dc=" query parameter.
The session being queried must be provided after the slash.
It returns a JSON body like this:
[
{
"LockDelay": 1.5e+10,
"Checks": [
"serfHealth"
],
"Node": "foobar",
"ID": "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
"CreateIndex": 1086449
}
]
If the session is not found, null is returned instead of a JSON list.
This endpoint supports blocking queries and all consistency modes.
### /v1/session/node/\<node\>
This endpoint is hit with a GET and returns the active sessions
for a given node and datacenter. By default the datacenter of the agent is queried,
however the dc can be provided using the "?dc=" query parameter.
The node being queried must be provided after the slash.
It returns a JSON body like this:
[
{
"LockDelay": 1.5e+10,
"Checks": [
"serfHealth"
],
"Node": "foobar",
"ID": "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
"CreateIndex": 1086449
},
...
]
This endpoint supports blocking queries and all consistency modes.
### /v1/session/list
This endpoint is hit with a GET and returns the active sessions
for a given datacenter. By default the datacenter of the agent is queried,
however the dc can be provided using the "?dc=" query parameter.
It returns a JSON body like this:
[
{
"LockDelay": 1.5e+10,
"Checks": [
"serfHealth"
],
"Node": "foobar",
"ID": "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
"CreateIndex": 1086449
},
...
]
This endpoint supports blocking queries and all consistency modes.
## Status
The Status endpoints are used to get information about the status

View File

@ -0,0 +1,73 @@
---
layout: "docs"
page_title: "Leader Election"
sidebar_current: "docs-guides-leader"
---
# Leader Election
The goal of this guide is to cover how to build client-side leader election using Consul.
If you are interested in the leader election used internally to Consul, you want to
read about the [consensus protocol](/docs/internals/consensus.html) instead.
There are a number of ways that leader election can be built, so our goal is not to
cover all the possible methods. Instead, we will focus on using Consul's support for
[sessions](/docs/internals/sessions.html), which allow us to build a system that can
gracefully handle failures.
## Contending Nodes
The first flow we cover is for nodes who are attempting to acquire leadership
for a given service. All nodes that are participating should agree on a given
key being used to coordinate. A good choice is simply:
service/<service name>/leader
We will refer to this as just `key` for simplicy.
The first step is to create a session. This is done using the /v1/session/create endpoint.
The session by default makes use of only the gossip failure detector. Additional checks
can be specified if desired. The session ID returned will be refered to as `session`.
Create `body` to represent the local node. This can be a simple JSON object
that contains the node's name, port or any application specific information
that may be needed.
Attempt to `acquire` the `key` by doing a `PUT`. This is something like:
curl -X PUT -d body http://localhost:8500/v1/kv/key?acquire=session
This will either return `true` or `false`. If `true` is returned, the lock
has been acquired and the local node is now the leader. If `false` is returned,
some other node has acquired the lock.
All nodes now remain in an idle waiting state. In this state, we watch for changes
on `key`. This is because the lock may be released, the node may fail, etc.
The leader must also watch for changes since it's lock may be released by an operator,
or automatically released due to a false positive in the failure detector.
Watching for changes is done by doing a blocking query against `key`. If we ever
notice that the `Session` of the `key` is blank, then there is no leader, and we should
retry acquiring the lock. Each attempt to acquire the key should be seperated by a timed
wait. This is because Consul may be enforcing a [`lock-delay`](/docs/internals/sessions.html).
If the leader ever wishes to step down voluntarily, this should be done by simply
releasing the lock:
curl -X PUT http://localhost:8500/v1/kv/key?release=session
## Discovering a Leader
The second flow is for nodes who are attempting to discover the leader
for a given servie. All nodes that are participating should agree on the key
being used to coordinate, including the contendors. This key will be referred
to as just `key`.
Clients have a very simple role, they simply read `key` to discover who the current
leader is. If the key has no associated `Session`, then there is no leader. Otherwise,
the value of the key will provide all the application-dependent information required.
Clients should also watch the key using a blocking query for any changes. If the leader
steps down, or fails, then the `Session` associated with the key will be cleared. When
a new leader is elected, the key value will also be updated.

View File

@ -0,0 +1,114 @@
---
layout: "docs"
page_title: "Sessions"
sidebar_current: "docs-internals-sessions"
---
# Sessions
Consul provides a session mechansim which can be used to build distributed locks.
Sessions act as a binding layer between nodes, health checks, and key/value data.
They are designed to provide granular locking, and are heavily inspired
by [The Chubby Lock Service for Loosely-Coupled Distributed Systems](http://research.google.com/archive/chubby.html).
<div class="alert alert-block alert-warning">
<strong>Advanced Topic!</strong> This page covers technical details of
the internals of Consul. You don't need to know these details to effectively
operate and use Consul. These details are documented here for those who wish
to learn about them without having to go spelunking through the source code.
</div>
## Session Design
A session in Consul represents a contract that has very specific semantics.
When a session is constructed a node name, a list of health checks, and a
`lock-delay` are provided. The newly constructed session is provided with
a named ID which can be used to refer to it. This ID can be used with the KV
store to acquire locks, which are advisory mechanisms for mutual exclusion.
Below is a diagram showing the relationship between these components:
![Session Architecture](/images/consul-sessions.png)
The contract that Consul provides is that under any of the folllowing
situations the session will be *invalidated*:
* Node is deregistered
* Any of the health checks are deregistered
* Any of the health checks go to the critical state
* Session is explicitly destroyed
When a session is invalidated, any of the locks held in association
with the session are released, and the `ModifyIndex` of the key is
incremented. The session is also destroyed during an invalidation
and can no longer be used to acquire further locks.
While this is a simple design, it enables a multitude of usage
patterns. By default, the [gossip based failure detector](/docs/internals/gossip.html)
is used as the associated health check. This failure detector allows
Consul to detect when a node that is holding a lock has failed, and
to automatically release the lock. This ability provides **liveness** to
Consul locks, meaning under failure the system can continue to make
progress. However, because there is no perfect failure detector, it's possible
to have a false positive (failure detected) which causes the lock to
be released even though the lock owner is still alive. This means
we are sacrificing some **safety**.
Conversely, it is possible to create a session with no associated
health checks. This removes the possibility of a false positive,
and trades liveness for safety. You can be absolutely certain Consul
will not release the lock even if the existing owner has failed.
Since Consul APIs allow a session to be force destroyed, this allows
systems to be built that require an operator to intervene in the
case of a failure, but preclude the possibility of a split-brain.
The final nuance is that sessions may provide a `lock-delay`. This
is a time duration, between 0 and 60 second. When a session invalidation
takes place, Consul prevents any of the previously held locks from
being re-acquired for the `lock-delay` interval; this is a safe guard
inspired by Google's Chubby. The purpose of this delay is to allow
the potentially still live leader to detect the invalidation and stop
processing requests that may lead to inconsistent state. While not a
bulletproof method, it does avoid the need to introduce sleep states
into application logic, and can help mitigate many issues. While the
default is to use a 15 second delay, clients are able to disable this
mechanism by providing a zero delay value.
## KV Integration
Integration between the Key/Value store and sessions are the primary
place where sessions are used. A session must be created prior to use,
and is then refered to by it's ID.
The Key/Value API is extended to support an `acquire` and `release` operation.
The `acquire` operation acts like a Check-And-Set operation, except it
can only succeed if there is no existing lock holder. On success, there
is a normal key update, but there is also an increment to the `LockIndex`,
and the `Session` value is updated to reflect the session holding the lock.
Once held, the lock can be released using a corresponding `release` operation,
providing the same session. Again, this acts like a Check-And-Set operations,
since the request will fail if given an invalid session. A critical note is
that the lock can be released without being the creator of the session.
This is by design, as it allows operators to intervene and force terminate
a session if necessary. As mentioned above, a session invalidation will also
cause all held locks to be released. When a lock is released, the `LockIndex`,
does not change, however the `Session` is cleared and the `ModifyIndex` increments.
These semantics (heavily borrowed from Chubby), allow the tuple of (Key, LockIndex, Session)
to act as a unique "sequencer". This `sequencer` can be passed around and used
to verify if the request belongs to the current lock holder. Because the `LockIndex`
is incremented on each `acquire`, even if the same session re-acquires a lock,
the `sequencer` will be able to detect a stale request. Similarly, if a session is
invalided, the Session corresponding to the given `LockIndex` will be blank.
To make clear, this locking system is purely *advisory*. There is no enforcement
that clients must acquire a lock to perform any operation. Any client can
read, write, and delete a key without owning the corresponding lock. It is not
the goal of Consul to protect against misbehaving clients.
## Leader Election
The primitives provided by sessions and the locking mechanisms of the KV
store can be used to build client-side leader election algorithms.
These are covered in more detail in the [Leader Election guide](/docs/guides/leader-election.html).

BIN
website/source/images/consul-sessions.png (Stored with Git LFS) Normal file

Binary file not shown.

View File

@ -34,6 +34,10 @@
<a href="/docs/internals/gossip.html">Gossip Protocol</a>
</li>
<li<%= sidebar_current("docs-internals-sessions") %>>
<a href="/docs/internals/sessions.html">Sessions</a>
</li>
<li<%= sidebar_current("docs-internals-security") %>>
<a href="/docs/internals/security.html">Security Model</a>
</li>
@ -140,6 +144,10 @@
<a href="/docs/guides/external.html">External Services</a>
</li>
<li<%= sidebar_current("docs-guides-leader") %>>
<a href="/docs/guides/leader-election.html">Leader Election</a>
</li>
<li<%= sidebar_current("docs-guides-datacenters") %>>
<a href="/docs/guides/datacenters.html">Multiple Datacenters</a>
</li>