open-consul/agent/consul/state/session.go

345 lines
9.8 KiB
Go
Raw Normal View History

package state
import (
"fmt"
"time"
pkg refactor command/agent/* -> agent/* command/consul/* -> agent/consul/* command/agent/command{,_test}.go -> command/agent{,_test}.go command/base/command.go -> command/base.go command/base/* -> command/* commands.go -> command/commands.go The script which did the refactor is: ( cd $GOPATH/src/github.com/hashicorp/consul git mv command/agent/command.go command/agent.go git mv command/agent/command_test.go command/agent_test.go git mv command/agent/flag_slice_value{,_test}.go command/ git mv command/agent . git mv command/base/command.go command/base.go git mv command/base/config_util{,_test}.go command/ git mv commands.go command/ git mv consul agent rmdir command/base/ gsed -i -e 's|package agent|package command|' command/agent{,_test}.go gsed -i -e 's|package agent|package command|' command/flag_slice_value{,_test}.go gsed -i -e 's|package base|package command|' command/base.go command/config_util{,_test}.go gsed -i -e 's|package main|package command|' command/commands.go gsed -i -e 's|base.Command|BaseCommand|' command/commands.go gsed -i -e 's|agent.Command|AgentCommand|' command/commands.go gsed -i -e 's|\tCommand:|\tBaseCommand:|' command/commands.go gsed -i -e 's|base\.||' command/commands.go gsed -i -e 's|command\.||' command/commands.go gsed -i -e 's|command|c|' main.go gsed -i -e 's|range Commands|range command.Commands|' main.go gsed -i -e 's|Commands: Commands|Commands: command.Commands|' main.go gsed -i -e 's|base\.BoolValue|BoolValue|' command/operator_autopilot_set.go gsed -i -e 's|base\.DurationValue|DurationValue|' command/operator_autopilot_set.go gsed -i -e 's|base\.StringValue|StringValue|' command/operator_autopilot_set.go gsed -i -e 's|base\.UintValue|UintValue|' command/operator_autopilot_set.go gsed -i -e 's|\bCommand\b|BaseCommand|' command/base.go gsed -i -e 's|BaseCommand Options|Command Options|' command/base.go gsed -i -e 's|base.Command|BaseCommand|' command/*.go gsed -i -e 's|c\.Command|c.BaseCommand|g' command/*.go gsed -i -e 's|\tCommand:|\tBaseCommand:|' command/*_test.go gsed -i -e 's|base\.||' command/*_test.go gsed -i -e 's|\bCommand\b|AgentCommand|' command/agent{,_test}.go gsed -i -e 's|cmd.AgentCommand|cmd.BaseCommand|' command/agent.go gsed -i -e 's|cli.AgentCommand = new(Command)|cli.Command = new(AgentCommand)|' command/agent_test.go gsed -i -e 's|exec.AgentCommand|exec.Command|' command/agent_test.go gsed -i -e 's|exec.BaseCommand|exec.Command|' command/agent_test.go gsed -i -e 's|NewTestAgent|agent.NewTestAgent|' command/agent_test.go gsed -i -e 's|= TestConfig|= agent.TestConfig|' command/agent_test.go gsed -i -e 's|: RetryJoin|: agent.RetryJoin|' command/agent_test.go gsed -i -e 's|\.\./\.\./|../|' command/config_util_test.go gsed -i -e 's|\bverifyUniqueListeners|VerifyUniqueListeners|' agent/config{,_test}.go command/agent.go gsed -i -e 's|\bserfLANKeyring\b|SerfLANKeyring|g' agent/{agent,keyring,testagent}.go command/agent.go gsed -i -e 's|\bserfWANKeyring\b|SerfWANKeyring|g' agent/{agent,keyring,testagent}.go command/agent.go gsed -i -e 's|\bNewAgent\b|agent.New|g' command/agent{,_test}.go gsed -i -e 's|\bNewAgent|New|' agent/{acl_test,agent,testagent}.go gsed -i -e 's|\bAgent\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bBool\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bConfig\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bDefaultConfig\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bDevConfig\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bMergeConfig\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bReadConfigPaths\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bParseMetaPair\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bSerfLANKeyring\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bSerfWANKeyring\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|circonus\.agent|circonus|g' command/agent{,_test}.go gsed -i -e 's|logger\.agent|logger|g' command/agent{,_test}.go gsed -i -e 's|metrics\.agent|metrics|g' command/agent{,_test}.go gsed -i -e 's|// agent.Agent|// agent|' command/agent{,_test}.go gsed -i -e 's|a\.agent\.Config|a.Config|' command/agent{,_test}.go gsed -i -e 's|agent\.AppendSliceValue|AppendSliceValue|' command/{configtest,validate}.go gsed -i -e 's|consul/consul|agent/consul|' GNUmakefile gsed -i -e 's|\.\./test|../../test|' agent/consul/server_test.go # fix imports f=$(grep -rl 'github.com/hashicorp/consul/command/agent' * | grep '\.go') gsed -i -e 's|github.com/hashicorp/consul/command/agent|github.com/hashicorp/consul/agent|' $f goimports -w $f f=$(grep -rl 'github.com/hashicorp/consul/consul' * | grep '\.go') gsed -i -e 's|github.com/hashicorp/consul/consul|github.com/hashicorp/consul/agent/consul|' $f goimports -w $f goimports -w command/*.go main.go )
2017-06-09 22:28:28 +00:00
"github.com/hashicorp/consul/agent/consul/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-memdb"
)
// Sessions is used to pull the full list of sessions for use during snapshots.
2017-04-21 00:46:29 +00:00
func (s *Snapshot) Sessions() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("sessions", "id")
if err != nil {
return nil, err
}
return iter, nil
}
// Session is used when restoring from a snapshot. For general inserts, use
// SessionCreate.
2017-04-21 00:46:29 +00:00
func (s *Restore) Session(sess *structs.Session) error {
// Insert the session.
if err := s.tx.Insert("sessions", sess); err != nil {
return fmt.Errorf("failed inserting session: %s", err)
}
// Insert the check mappings.
for _, checkID := range sess.Checks {
mapping := &sessionCheck{
Node: sess.Node,
CheckID: checkID,
Session: sess.ID,
}
if err := s.tx.Insert("session_checks", mapping); err != nil {
return fmt.Errorf("failed inserting session check mapping: %s", err)
}
}
// Update the index.
if err := indexUpdateMaxTxn(s.tx, sess.ModifyIndex, "sessions"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}
// SessionCreate is used to register a new session in the state store.
2017-04-21 00:46:29 +00:00
func (s *Store) SessionCreate(idx uint64, sess *structs.Session) error {
tx := s.db.Txn(true)
defer tx.Abort()
// This code is technically able to (incorrectly) update an existing
// session but we never do that in practice. The upstream endpoint code
// always adds a unique ID when doing a create operation so we never hit
// an existing session again. It isn't worth the overhead to verify
// that here, but it's worth noting that we should never do this in the
// future.
// Call the session creation
if err := s.sessionCreateTxn(tx, idx, sess); err != nil {
return err
}
tx.Commit()
return nil
}
// sessionCreateTxn is the inner method used for creating session entries in
// an open transaction. Any health checks registered with the session will be
// checked for failing status. Returns any error encountered.
2017-04-21 00:46:29 +00:00
func (s *Store) sessionCreateTxn(tx *memdb.Txn, idx uint64, sess *structs.Session) error {
// Check that we have a session ID
if sess.ID == "" {
return ErrMissingSessionID
}
// Verify the session behavior is valid
switch sess.Behavior {
case "":
// Release by default to preserve backwards compatibility
sess.Behavior = structs.SessionKeysRelease
case structs.SessionKeysRelease:
case structs.SessionKeysDelete:
default:
return fmt.Errorf("Invalid session behavior: %s", sess.Behavior)
}
// Assign the indexes. ModifyIndex likely will not be used but
// we set it here anyways for sanity.
sess.CreateIndex = idx
sess.ModifyIndex = idx
// Check that the node exists
node, err := tx.First("nodes", "id", sess.Node)
if err != nil {
return fmt.Errorf("failed node lookup: %s", err)
}
if node == nil {
return ErrMissingNode
}
// Go over the session checks and ensure they exist.
for _, checkID := range sess.Checks {
check, err := tx.First("checks", "id", sess.Node, string(checkID))
if err != nil {
return fmt.Errorf("failed check lookup: %s", err)
}
if check == nil {
return fmt.Errorf("Missing check '%s' registration", checkID)
}
// Check that the check is not in critical state
status := check.(*structs.HealthCheck).Status
if status == api.HealthCritical {
return fmt.Errorf("Check '%s' is in %s state", checkID, status)
}
}
// Insert the session
if err := tx.Insert("sessions", sess); err != nil {
return fmt.Errorf("failed inserting session: %s", err)
}
// Insert the check mappings
for _, checkID := range sess.Checks {
mapping := &sessionCheck{
Node: sess.Node,
CheckID: checkID,
Session: sess.ID,
}
if err := tx.Insert("session_checks", mapping); err != nil {
return fmt.Errorf("failed inserting session check mapping: %s", err)
}
}
// Update the index
if err := tx.Insert("index", &IndexEntry{"sessions", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}
// SessionGet is used to retrieve an active session from the state store.
2017-04-21 00:46:29 +00:00
func (s *Store) SessionGet(ws memdb.WatchSet, sessionID string) (uint64, *structs.Session, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "sessions")
// Look up the session by its ID
watchCh, session, err := tx.FirstWatch("sessions", "id", sessionID)
if err != nil {
return 0, nil, fmt.Errorf("failed session lookup: %s", err)
}
ws.Add(watchCh)
if session != nil {
return idx, session.(*structs.Session), nil
}
return idx, nil, nil
}
// SessionList returns a slice containing all of the active sessions.
2017-04-21 00:46:29 +00:00
func (s *Store) SessionList(ws memdb.WatchSet) (uint64, structs.Sessions, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "sessions")
// Query all of the active sessions.
sessions, err := tx.Get("sessions", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed session lookup: %s", err)
}
ws.Add(sessions.WatchCh())
// Go over the sessions and create a slice of them.
var result structs.Sessions
for session := sessions.Next(); session != nil; session = sessions.Next() {
result = append(result, session.(*structs.Session))
}
return idx, result, nil
}
// NodeSessions returns a set of active sessions associated
// with the given node ID. The returned index is the highest
// index seen from the result set.
2017-04-21 00:46:29 +00:00
func (s *Store) NodeSessions(ws memdb.WatchSet, nodeID string) (uint64, structs.Sessions, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "sessions")
// Get all of the sessions which belong to the node
sessions, err := tx.Get("sessions", "node", nodeID)
if err != nil {
return 0, nil, fmt.Errorf("failed session lookup: %s", err)
}
ws.Add(sessions.WatchCh())
// Go over all of the sessions and return them as a slice
var result structs.Sessions
for session := sessions.Next(); session != nil; session = sessions.Next() {
result = append(result, session.(*structs.Session))
}
return idx, result, nil
}
// SessionDestroy is used to remove an active session. This will
// implicitly invalidate the session and invoke the specified
// session destroy behavior.
2017-04-21 00:46:29 +00:00
func (s *Store) SessionDestroy(idx uint64, sessionID string) error {
tx := s.db.Txn(true)
defer tx.Abort()
// Call the session deletion.
2017-01-24 19:53:02 +00:00
if err := s.deleteSessionTxn(tx, idx, sessionID); err != nil {
return err
}
tx.Commit()
return nil
}
// deleteSessionTxn is the inner method, which is used to do the actual
2017-01-24 19:53:02 +00:00
// session deletion and handle session invalidation, etc.
2017-04-21 00:46:29 +00:00
func (s *Store) deleteSessionTxn(tx *memdb.Txn, idx uint64, sessionID string) error {
// Look up the session.
sess, err := tx.First("sessions", "id", sessionID)
if err != nil {
return fmt.Errorf("failed session lookup: %s", err)
}
if sess == nil {
return nil
}
// Delete the session and write the new index.
if err := tx.Delete("sessions", sess); err != nil {
return fmt.Errorf("failed deleting session: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"sessions", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
// Enforce the max lock delay.
session := sess.(*structs.Session)
delay := session.LockDelay
if delay > structs.MaxLockDelay {
delay = structs.MaxLockDelay
}
// Snag the current now time so that all the expirations get calculated
// the same way.
now := time.Now()
// Get an iterator over all of the keys with the given session.
entries, err := tx.Get("kvs", "session", sessionID)
if err != nil {
return fmt.Errorf("failed kvs lookup: %s", err)
}
var kvs []interface{}
for entry := entries.Next(); entry != nil; entry = entries.Next() {
kvs = append(kvs, entry)
}
// Invalidate any held locks.
switch session.Behavior {
case structs.SessionKeysRelease:
for _, obj := range kvs {
// Note that we clone here since we are modifying the
// returned object and want to make sure our set op
// respects the transaction we are in.
e := obj.(*structs.DirEntry).Clone()
e.Session = ""
if err := s.kvsSetTxn(tx, idx, e, true); err != nil {
return fmt.Errorf("failed kvs update: %s", err)
}
// Apply the lock delay if present.
if delay > 0 {
s.lockDelay.SetExpiration(e.Key, now, delay)
}
}
case structs.SessionKeysDelete:
for _, obj := range kvs {
e := obj.(*structs.DirEntry)
if err := s.kvsDeleteTxn(tx, idx, e.Key); err != nil {
return fmt.Errorf("failed kvs delete: %s", err)
}
// Apply the lock delay if present.
if delay > 0 {
s.lockDelay.SetExpiration(e.Key, now, delay)
}
}
default:
return fmt.Errorf("unknown session behavior %#v", session.Behavior)
}
// Delete any check mappings.
mappings, err := tx.Get("session_checks", "session", sessionID)
if err != nil {
return fmt.Errorf("failed session checks lookup: %s", err)
}
{
var objs []interface{}
for mapping := mappings.Next(); mapping != nil; mapping = mappings.Next() {
objs = append(objs, mapping)
}
// Do the delete in a separate loop so we don't trash the iterator.
for _, obj := range objs {
if err := tx.Delete("session_checks", obj); err != nil {
return fmt.Errorf("failed deleting session check: %s", err)
}
}
}
// Delete any prepared queries.
queries, err := tx.Get("prepared-queries", "session", sessionID)
if err != nil {
return fmt.Errorf("failed prepared query lookup: %s", err)
}
{
var ids []string
for wrapped := queries.Next(); wrapped != nil; wrapped = queries.Next() {
ids = append(ids, toPreparedQuery(wrapped).ID)
}
// Do the delete in a separate loop so we don't trash the iterator.
for _, id := range ids {
2017-01-24 19:53:02 +00:00
if err := s.preparedQueryDeleteTxn(tx, idx, id); err != nil {
return fmt.Errorf("failed prepared query delete: %s", err)
}
}
}
return nil
}