Merge pull request #487 from amalaviy/ephemeral_keys
Ephemeral Nodes for via Session behavior settings.
This commit is contained in:
commit
b58b35d659
|
@ -32,13 +32,14 @@ func (s *HTTPServer) SessionCreate(resp http.ResponseWriter, req *http.Request)
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// Default the session to our node + serf check
|
||||
// Default the session to our node + serf check + release session invalidate behavior
|
||||
args := structs.SessionRequest{
|
||||
Op: structs.SessionCreate,
|
||||
Session: structs.Session{
|
||||
Node: s.agent.config.NodeName,
|
||||
Checks: []string{consul.SerfCheckID},
|
||||
LockDelay: 15 * time.Second,
|
||||
Behavior: structs.SessionKeysRelease,
|
||||
},
|
||||
}
|
||||
s.parseDC(req, &args.Datacenter)
|
||||
|
|
|
@ -59,6 +59,55 @@ func TestSessionCreate(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestSessionCreateDelete(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
// Create a health check
|
||||
args := &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: srv.agent.config.NodeName,
|
||||
Address: "127.0.0.1",
|
||||
Check: &structs.HealthCheck{
|
||||
CheckID: "consul",
|
||||
Node: srv.agent.config.NodeName,
|
||||
Name: "consul",
|
||||
ServiceID: "consul",
|
||||
Status: structs.HealthPassing,
|
||||
},
|
||||
}
|
||||
var out struct{}
|
||||
if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Associate session with node and 2 health checks, and make it delete on session destroy
|
||||
body := bytes.NewBuffer(nil)
|
||||
enc := json.NewEncoder(body)
|
||||
raw := map[string]interface{}{
|
||||
"Name": "my-cool-session",
|
||||
"Node": srv.agent.config.NodeName,
|
||||
"Checks": []string{consul.SerfCheckID, "consul"},
|
||||
"LockDelay": "20s",
|
||||
"Behavior": structs.SessionKeysDelete,
|
||||
}
|
||||
enc.Encode(raw)
|
||||
|
||||
req, err := http.NewRequest("PUT", "/v1/session/create", body)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.SessionCreate(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if _, ok := obj.(sessionCreateResponse); !ok {
|
||||
t.Fatalf("should work")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestFixupLockDelay(t *testing.T) {
|
||||
inp := map[string]interface{}{
|
||||
"lockdelay": float64(15),
|
||||
|
@ -105,6 +154,28 @@ func makeTestSession(t *testing.T, srv *HTTPServer) string {
|
|||
return sessResp.ID
|
||||
}
|
||||
|
||||
func makeTestSessionDelete(t *testing.T, srv *HTTPServer) string {
|
||||
// Create Session with delete behavior
|
||||
body := bytes.NewBuffer(nil)
|
||||
enc := json.NewEncoder(body)
|
||||
raw := map[string]interface{}{
|
||||
"Behavior": "delete",
|
||||
}
|
||||
enc.Encode(raw)
|
||||
|
||||
req, err := http.NewRequest("PUT", "/v1/session/create", body)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.SessionCreate(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
sessResp := obj.(sessionCreateResponse)
|
||||
return sessResp.ID
|
||||
}
|
||||
|
||||
func TestSessionDestroy(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
id := makeTestSession(t, srv)
|
||||
|
@ -188,3 +259,45 @@ func TestSessionsForNode(t *testing.T) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestSessionDeleteDestroy(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
id := makeTestSessionDelete(t, srv)
|
||||
|
||||
// now create a new key for the session and acquire it
|
||||
buf := bytes.NewBuffer([]byte("test"))
|
||||
req, err := http.NewRequest("PUT", "/v1/kv/ephemeral?acquire="+id, buf)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.KVSEndpoint(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if res := obj.(bool); !res {
|
||||
t.Fatalf("should work")
|
||||
}
|
||||
|
||||
// now destroy the session, this should delete the key created above
|
||||
req, err = http.NewRequest("PUT", "/v1/session/destroy/"+id, nil)
|
||||
resp = httptest.NewRecorder()
|
||||
obj, err = srv.SessionDestroy(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if resp := obj.(bool); !resp {
|
||||
t.Fatalf("should work")
|
||||
}
|
||||
|
||||
// Verify that the key is gone
|
||||
req, _ = http.NewRequest("GET", "/v1/kv/ephemeral", nil)
|
||||
resp = httptest.NewRecorder()
|
||||
obj, _ = srv.KVSEndpoint(resp, req)
|
||||
res, found := obj.(structs.DirEntries)
|
||||
if found || len(res) != 0 {
|
||||
t.Fatalf("bad: %v found, should be nothing", res)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -28,6 +28,14 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
|
|||
if args.Session.Node == "" && args.Op == structs.SessionCreate {
|
||||
return fmt.Errorf("Must provide Node")
|
||||
}
|
||||
switch args.Session.Behavior {
|
||||
case structs.SessionKeysRelease, structs.SessionKeysDelete:
|
||||
// we like it, use it
|
||||
case "":
|
||||
args.Session.Behavior = structs.SessionKeysRelease // force default behavior
|
||||
default:
|
||||
return fmt.Errorf("Invalid Behavior setting '%s'", args.Session.Behavior)
|
||||
}
|
||||
|
||||
// If this is a create, we must generate the Session ID. This must
|
||||
// be done prior to appending to the raft log, because the ID is not
|
||||
|
|
|
@ -66,6 +66,69 @@ func TestSessionEndpoint_Apply(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSessionEndpoint_DeleteApply(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
testutil.WaitForLeader(t, client.Call, "dc1")
|
||||
|
||||
// Just add a node
|
||||
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
|
||||
|
||||
arg := structs.SessionRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.SessionCreate,
|
||||
Session: structs.Session{
|
||||
Node: "foo",
|
||||
Name: "my-session",
|
||||
Behavior: structs.SessionKeysDelete,
|
||||
},
|
||||
}
|
||||
var out string
|
||||
if err := client.Call("Session.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
id := out
|
||||
|
||||
// Verify
|
||||
state := s1.fsm.State()
|
||||
_, s, err := state.SessionGet(out)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if s == nil {
|
||||
t.Fatalf("should not be nil")
|
||||
}
|
||||
if s.Node != "foo" {
|
||||
t.Fatalf("bad: %v", s)
|
||||
}
|
||||
if s.Name != "my-session" {
|
||||
t.Fatalf("bad: %v", s)
|
||||
}
|
||||
if s.Behavior != structs.SessionKeysDelete {
|
||||
t.Fatalf("bad: %v", s)
|
||||
}
|
||||
|
||||
// Do a delete
|
||||
arg.Op = structs.SessionDestroy
|
||||
arg.Session.ID = out
|
||||
if err := client.Call("Session.Apply", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify
|
||||
_, s, err = state.SessionGet(id)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if s != nil {
|
||||
t.Fatalf("bad: %v", s)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSessionEndpoint_Get(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
|
|
|
@ -1327,6 +1327,15 @@ func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error
|
|||
return fmt.Errorf("Missing Session ID")
|
||||
}
|
||||
|
||||
switch session.Behavior {
|
||||
case structs.SessionKeysRelease, structs.SessionKeysDelete:
|
||||
// we like
|
||||
case "":
|
||||
session.Behavior = structs.SessionKeysRelease // force default behavior
|
||||
default:
|
||||
return fmt.Errorf("Invalid Session Behavior setting '%s'", session.Behavior)
|
||||
}
|
||||
|
||||
// Assign the create index
|
||||
session.CreateIndex = index
|
||||
|
||||
|
@ -1454,7 +1463,7 @@ func (s *StateStore) SessionDestroy(index uint64, id string) error {
|
|||
}
|
||||
defer tx.Abort()
|
||||
|
||||
log.Printf("[DEBUG] consul.state: Invalidating session %s due to session destroy",
|
||||
s.logger.Printf("[DEBUG] consul.state: Invalidating session %s due to session destroy",
|
||||
id)
|
||||
if err := s.invalidateSession(index, tx, id); err != nil {
|
||||
return err
|
||||
|
@ -1471,7 +1480,7 @@ func (s *StateStore) invalidateNode(index uint64, tx *MDBTxn, node string) error
|
|||
}
|
||||
for _, sess := range sessions {
|
||||
session := sess.(*structs.Session).ID
|
||||
log.Printf("[DEBUG] consul.state: Invalidating session %s due to node '%s' invalidation",
|
||||
s.logger.Printf("[DEBUG] consul.state: Invalidating session %s due to node '%s' invalidation",
|
||||
session, node)
|
||||
if err := s.invalidateSession(index, tx, session); err != nil {
|
||||
return err
|
||||
|
@ -1489,7 +1498,7 @@ func (s *StateStore) invalidateCheck(index uint64, tx *MDBTxn, node, check strin
|
|||
}
|
||||
for _, sc := range sessionChecks {
|
||||
session := sc.(*sessionCheck).Session
|
||||
log.Printf("[DEBUG] consul.state: Invalidating session %s due to check '%s' invalidation",
|
||||
s.logger.Printf("[DEBUG] consul.state: Invalidating session %s due to check '%s' invalidation",
|
||||
session, check)
|
||||
if err := s.invalidateSession(index, tx, session); err != nil {
|
||||
return err
|
||||
|
@ -1513,15 +1522,23 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro
|
|||
}
|
||||
session := res[0].(*structs.Session)
|
||||
|
||||
// Enforce the MaxLockDelay
|
||||
delay := session.LockDelay
|
||||
if delay > structs.MaxLockDelay {
|
||||
delay = structs.MaxLockDelay
|
||||
}
|
||||
if session.Behavior == structs.SessionKeysDelete {
|
||||
// delete the keys held by the session
|
||||
if err := s.deleteKeys(index, tx, id); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Invalidate any held locks
|
||||
if err := s.invalidateLocks(index, tx, delay, id); err != nil {
|
||||
return err
|
||||
} else { // default to release
|
||||
// Enforce the MaxLockDelay
|
||||
delay := session.LockDelay
|
||||
if delay > structs.MaxLockDelay {
|
||||
delay = structs.MaxLockDelay
|
||||
}
|
||||
|
||||
// Invalidate any held locks
|
||||
if err := s.invalidateLocks(index, tx, delay, id); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Nuke the session
|
||||
|
@ -1588,6 +1605,23 @@ func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn,
|
|||
return nil
|
||||
}
|
||||
|
||||
// deleteKeys is used to delete all the keys created by a session
|
||||
// within a given txn. All tables should be locked in the tx.
|
||||
func (s *StateStore) deleteKeys(index uint64, tx *MDBTxn, id string) error {
|
||||
num, err := s.kvsTable.DeleteTxn(tx, "session", id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if num > 0 {
|
||||
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
|
||||
return err
|
||||
}
|
||||
tx.Defer(func() { s.watch[s.kvsTable].Notify() })
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ACLSet is used to create or update an ACL entry
|
||||
func (s *StateStore) ACLSet(index uint64, acl *structs.ACL) error {
|
||||
// Check for an ID
|
||||
|
|
|
@ -2279,6 +2279,53 @@ func TestSessionInvalidate_KeyUnlock(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSessionInvalidate_KeyDelete(t *testing.T) {
|
||||
store, err := testStateStore()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
session := &structs.Session{
|
||||
ID: generateUUID(),
|
||||
Node: "foo",
|
||||
LockDelay: 50 * time.Millisecond,
|
||||
Behavior: structs.SessionKeysDelete,
|
||||
}
|
||||
if err := store.SessionCreate(4, session); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Lock a key with the session
|
||||
d := &structs.DirEntry{
|
||||
Key: "/bar",
|
||||
Flags: 42,
|
||||
Value: []byte("test"),
|
||||
Session: session.ID,
|
||||
}
|
||||
ok, err := store.KVSLock(5, d)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("unexpected fail")
|
||||
}
|
||||
|
||||
// Delete the node
|
||||
if err := store.DeleteNode(6, "foo"); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Key should be deleted
|
||||
_, d2, err := store.KVSGet("/bar")
|
||||
if d2 != nil {
|
||||
t.Fatalf("unexpected undeleted key")
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLSet_Get(t *testing.T) {
|
||||
store, err := testStateStore()
|
||||
if err != nil {
|
||||
|
|
|
@ -378,6 +378,13 @@ type IndexedKeyList struct {
|
|||
QueryMeta
|
||||
}
|
||||
|
||||
type SessionBehavior string
|
||||
|
||||
const (
|
||||
SessionKeysRelease SessionBehavior = "release"
|
||||
SessionKeysDelete = "delete"
|
||||
)
|
||||
|
||||
// Session is used to represent an open session in the KV store.
|
||||
// This issued to associate node checks with acquired locks.
|
||||
type Session struct {
|
||||
|
@ -387,6 +394,7 @@ type Session struct {
|
|||
Node string
|
||||
Checks []string
|
||||
LockDelay time.Duration
|
||||
Behavior SessionBehavior // What to do when session is invalidated
|
||||
}
|
||||
type Sessions []*Session
|
||||
|
||||
|
|
Loading…
Reference in New Issue