package consul import ( "fmt" "io" "io/ioutil" "log" "os" "runtime" "strings" "sync" "time" "github.com/armon/go-radix" "github.com/armon/gomdb" "github.com/hashicorp/consul/consul/structs" ) const ( dbNodes = "nodes" dbServices = "services" dbChecks = "checks" dbKVS = "kvs" dbTombstone = "tombstones" dbSessions = "sessions" dbSessionChecks = "sessionChecks" dbACLs = "acls" dbMaxMapSize32bit uint64 = 128 * 1024 * 1024 // 128MB maximum size dbMaxMapSize64bit uint64 = 32 * 1024 * 1024 * 1024 // 32GB maximum size dbMaxReaders uint = 4096 // 4K, default is 126 ) // kvMode is used internally to control which type of set // operation we are performing type kvMode int const ( kvSet kvMode = iota kvCAS kvLock kvUnlock ) // The StateStore is responsible for maintaining all the Consul // state. It is manipulated by the FSM which maintains consistency // through the use of Raft. The goals of the StateStore are to provide // high concurrency for read operations without blocking writes, and // to provide write availability in the face of reads. The current // implementation uses the Lightning Memory-Mapped Database (MDB). // This gives us Multi-Version Concurrency Control for "free" type StateStore struct { logger *log.Logger path string env *mdb.Env nodeTable *MDBTable serviceTable *MDBTable checkTable *MDBTable kvsTable *MDBTable tombstoneTable *MDBTable sessionTable *MDBTable sessionCheckTable *MDBTable aclTable *MDBTable tables MDBTables watch map[*MDBTable]*NotifyGroup queryTables map[string]MDBTables // kvWatch is a more optimized way of watching for KV changes. // Instead of just using a NotifyGroup for the entire table, // a watcher is instantiated on a given prefix. When a change happens, // only the relevant watchers are woken up. This reduces the cost of // watching for KV changes. kvWatch *radix.Tree kvWatchLock sync.Mutex // lockDelay is used to mark certain locks as unacquirable. // When a lock is forcefully released (failing health // check, destroyed session, etc), it is subject to the LockDelay // impossed by the session. This prevents another session from // acquiring the lock for some period of time as a protection against // split-brains. This is inspired by the lock-delay in Chubby. // Because this relies on wall-time, we cannot assume all peers // perceive time as flowing uniformly. This means KVSLock MUST ignore // lockDelay, since the lockDelay may have expired on the leader, // but not on the follower. Rejecting the lock could result in // inconsistencies in the FSMs due to the rate time progresses. Instead, // only the opinion of the leader is respected, and the Raft log // is never questioned. lockDelay map[string]time.Time lockDelayLock sync.RWMutex // GC is when we create tombstones to track their time-to-live. // The GC is consumed upstream to manage clearing of tombstones. gc *TombstoneGC } // StateSnapshot is used to provide a point-in-time snapshot // It works by starting a readonly transaction against all tables. type StateSnapshot struct { store *StateStore tx *MDBTxn lastIndex uint64 } // sessionCheck is used to create a many-to-one table such // that each check registered by a session can be mapped back // to the session row. type sessionCheck struct { Node string CheckID string Session string } // Close is used to abort the transaction and allow for cleanup func (s *StateSnapshot) Close() error { s.tx.Abort() return nil } // NewStateStore is used to create a new state store func NewStateStore(gc *TombstoneGC, logOutput io.Writer) (*StateStore, error) { // Create a new temp dir path, err := ioutil.TempDir("", "consul") if err != nil { return nil, err } return NewStateStorePath(gc, path, logOutput) } // NewStateStorePath is used to create a new state store at a given path // The path is cleared on closing. func NewStateStorePath(gc *TombstoneGC, path string, logOutput io.Writer) (*StateStore, error) { // Open the env env, err := mdb.NewEnv() if err != nil { return nil, err } s := &StateStore{ logger: log.New(logOutput, "", log.LstdFlags), path: path, env: env, watch: make(map[*MDBTable]*NotifyGroup), kvWatch: radix.New(), lockDelay: make(map[string]time.Time), gc: gc, } // Ensure we can initialize if err := s.initialize(); err != nil { env.Close() os.RemoveAll(path) return nil, err } return s, nil } // Close is used to safely shutdown the state store func (s *StateStore) Close() error { s.env.Close() os.RemoveAll(s.path) return nil } // initialize is used to setup the store for use func (s *StateStore) initialize() error { // Setup the Env first if err := s.env.SetMaxDBs(mdb.DBI(32)); err != nil { return err } // Set the maximum db size based on 32/64bit. Since we are // doing an mmap underneath, we need to limit our use of virtual // address space on 32bit, but don't have to care on 64bit. dbSize := dbMaxMapSize32bit if runtime.GOARCH == "amd64" { dbSize = dbMaxMapSize64bit } // Increase the maximum map size if err := s.env.SetMapSize(dbSize); err != nil { return err } // Increase the maximum number of concurrent readers // TODO: Block transactions if we could exceed dbMaxReaders if err := s.env.SetMaxReaders(dbMaxReaders); err != nil { return err } // Optimize our flags for speed over safety, since the Raft log + snapshots // are durable. We treat this as an ephemeral in-memory DB, since we nuke // the data anyways. var flags uint = mdb.NOMETASYNC | mdb.NOSYNC | mdb.NOTLS if err := s.env.Open(s.path, flags, 0755); err != nil { return err } // Tables use a generic struct encoder encoder := func(obj interface{}) []byte { buf, err := structs.Encode(255, obj) if err != nil { panic(err) } return buf[1:] } // Setup our tables s.nodeTable = &MDBTable{ Name: dbNodes, Indexes: map[string]*MDBIndex{ "id": &MDBIndex{ Unique: true, Fields: []string{"Node"}, CaseInsensitive: true, }, }, Decoder: func(buf []byte) interface{} { out := new(structs.Node) if err := structs.Decode(buf, out); err != nil { panic(err) } return out }, } s.serviceTable = &MDBTable{ Name: dbServices, Indexes: map[string]*MDBIndex{ "id": &MDBIndex{ Unique: true, Fields: []string{"Node", "ServiceID"}, }, "service": &MDBIndex{ AllowBlank: true, Fields: []string{"ServiceName"}, CaseInsensitive: true, }, }, Decoder: func(buf []byte) interface{} { out := new(structs.ServiceNode) if err := structs.Decode(buf, out); err != nil { panic(err) } return out }, } s.checkTable = &MDBTable{ Name: dbChecks, Indexes: map[string]*MDBIndex{ "id": &MDBIndex{ Unique: true, Fields: []string{"Node", "CheckID"}, }, "status": &MDBIndex{ Fields: []string{"Status"}, }, "service": &MDBIndex{ AllowBlank: true, Fields: []string{"ServiceName"}, }, "node": &MDBIndex{ AllowBlank: true, Fields: []string{"Node", "ServiceID"}, }, }, Decoder: func(buf []byte) interface{} { out := new(structs.HealthCheck) if err := structs.Decode(buf, out); err != nil { panic(err) } return out }, } s.kvsTable = &MDBTable{ Name: dbKVS, Indexes: map[string]*MDBIndex{ "id": &MDBIndex{ Unique: true, Fields: []string{"Key"}, }, "id_prefix": &MDBIndex{ Virtual: true, RealIndex: "id", Fields: []string{"Key"}, IdxFunc: DefaultIndexPrefixFunc, }, "session": &MDBIndex{ AllowBlank: true, Fields: []string{"Session"}, }, }, Decoder: func(buf []byte) interface{} { out := new(structs.DirEntry) if err := structs.Decode(buf, out); err != nil { panic(err) } return out }, } s.tombstoneTable = &MDBTable{ Name: dbTombstone, Indexes: map[string]*MDBIndex{ "id": &MDBIndex{ Unique: true, Fields: []string{"Key"}, }, "id_prefix": &MDBIndex{ Virtual: true, RealIndex: "id", Fields: []string{"Key"}, IdxFunc: DefaultIndexPrefixFunc, }, }, Decoder: func(buf []byte) interface{} { out := new(structs.DirEntry) if err := structs.Decode(buf, out); err != nil { panic(err) } return out }, } s.sessionTable = &MDBTable{ Name: dbSessions, Indexes: map[string]*MDBIndex{ "id": &MDBIndex{ Unique: true, Fields: []string{"ID"}, }, "node": &MDBIndex{ AllowBlank: true, Fields: []string{"Node"}, }, }, Decoder: func(buf []byte) interface{} { out := new(structs.Session) if err := structs.Decode(buf, out); err != nil { panic(err) } return out }, } s.sessionCheckTable = &MDBTable{ Name: dbSessionChecks, Indexes: map[string]*MDBIndex{ "id": &MDBIndex{ Unique: true, Fields: []string{"Node", "CheckID", "Session"}, }, }, Decoder: func(buf []byte) interface{} { out := new(sessionCheck) if err := structs.Decode(buf, out); err != nil { panic(err) } return out }, } s.aclTable = &MDBTable{ Name: dbACLs, Indexes: map[string]*MDBIndex{ "id": &MDBIndex{ Unique: true, Fields: []string{"ID"}, }, }, Decoder: func(buf []byte) interface{} { out := new(structs.ACL) if err := structs.Decode(buf, out); err != nil { panic(err) } return out }, } // Store the set of tables s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable, s.kvsTable, s.tombstoneTable, s.sessionTable, s.sessionCheckTable, s.aclTable} for _, table := range s.tables { table.Env = s.env table.Encoder = encoder if err := table.Init(); err != nil { return err } // Setup a notification group per table s.watch[table] = &NotifyGroup{} } // Setup the query tables s.queryTables = map[string]MDBTables{ "Nodes": MDBTables{s.nodeTable}, "Services": MDBTables{s.serviceTable}, "ServiceNodes": MDBTables{s.nodeTable, s.serviceTable}, "NodeServices": MDBTables{s.nodeTable, s.serviceTable}, "ChecksInState": MDBTables{s.checkTable}, "NodeChecks": MDBTables{s.checkTable}, "ServiceChecks": MDBTables{s.checkTable}, "CheckServiceNodes": MDBTables{s.nodeTable, s.serviceTable, s.checkTable}, "NodeInfo": MDBTables{s.nodeTable, s.serviceTable, s.checkTable}, "NodeDump": MDBTables{s.nodeTable, s.serviceTable, s.checkTable}, "SessionGet": MDBTables{s.sessionTable}, "SessionList": MDBTables{s.sessionTable}, "NodeSessions": MDBTables{s.sessionTable}, "ACLGet": MDBTables{s.aclTable}, "ACLList": MDBTables{s.aclTable}, } return nil } // Watch is used to subscribe a channel to a set of MDBTables func (s *StateStore) Watch(tables MDBTables, notify chan struct{}) { for _, t := range tables { s.watch[t].Wait(notify) } } // StopWatch is used to unsubscribe a channel to a set of MDBTables func (s *StateStore) StopWatch(tables MDBTables, notify chan struct{}) { for _, t := range tables { s.watch[t].Clear(notify) } } // WatchKV is used to subscribe a channel to changes in KV data func (s *StateStore) WatchKV(prefix string, notify chan struct{}) { s.kvWatchLock.Lock() defer s.kvWatchLock.Unlock() // Check for an existing notify group if raw, ok := s.kvWatch.Get(prefix); ok { grp := raw.(*NotifyGroup) grp.Wait(notify) return } // Create new notify group grp := &NotifyGroup{} grp.Wait(notify) s.kvWatch.Insert(prefix, grp) } // StopWatchKV is used to unsubscribe a channel from changes in KV data func (s *StateStore) StopWatchKV(prefix string, notify chan struct{}) { s.kvWatchLock.Lock() defer s.kvWatchLock.Unlock() // Check for an existing notify group if raw, ok := s.kvWatch.Get(prefix); ok { grp := raw.(*NotifyGroup) grp.Clear(notify) } } // notifyKV is used to notify any KV listeners of a change // on a prefix func (s *StateStore) notifyKV(path string, prefix bool) { s.kvWatchLock.Lock() defer s.kvWatchLock.Unlock() var toDelete []string fn := func(s string, v interface{}) bool { group := v.(*NotifyGroup) group.Notify() if s != "" { toDelete = append(toDelete, s) } return false } // Invoke any watcher on the path downward to the key. s.kvWatch.WalkPath(path, fn) // If the entire prefix may be affected (e.g. delete tree), // invoke the entire prefix if prefix { s.kvWatch.WalkPrefix(path, fn) } // Delete the old watch groups for i := len(toDelete) - 1; i >= 0; i-- { s.kvWatch.Delete(toDelete[i]) } } // QueryTables returns the Tables that are queried for a given query func (s *StateStore) QueryTables(q string) MDBTables { return s.queryTables[q] } // EnsureRegistration is used to make sure a node, service, and check registration // is performed within a single transaction to avoid race conditions on state updates. func (s *StateStore) EnsureRegistration(index uint64, req *structs.RegisterRequest) error { tx, err := s.tables.StartTxn(false) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() // Ensure the node node := structs.Node{req.Node, req.Address} if err := s.ensureNodeTxn(index, node, tx); err != nil { return err } // Ensure the service if provided if req.Service != nil { if err := s.ensureServiceTxn(index, req.Node, req.Service, tx); err != nil { return err } } // Ensure the check(s), if provided if req.Check != nil { if err := s.ensureCheckTxn(index, req.Check, tx); err != nil { return err } } for _, check := range req.Checks { if err := s.ensureCheckTxn(index, check, tx); err != nil { return err } } // Commit as one unit return tx.Commit() } // EnsureNode is used to ensure a given node exists, with the provided address func (s *StateStore) EnsureNode(index uint64, node structs.Node) error { tx, err := s.nodeTable.StartTxn(false, nil) if err != nil { return err } defer tx.Abort() if err := s.ensureNodeTxn(index, node, tx); err != nil { return err } return tx.Commit() } // ensureNodeTxn is used to ensure a given node exists, with the provided address // within a given txn func (s *StateStore) ensureNodeTxn(index uint64, node structs.Node, tx *MDBTxn) error { if err := s.nodeTable.InsertTxn(tx, node); err != nil { return err } if err := s.nodeTable.SetLastIndexTxn(tx, index); err != nil { return err } tx.Defer(func() { s.watch[s.nodeTable].Notify() }) return nil } // GetNode returns all the address of the known and if it was found func (s *StateStore) GetNode(name string) (uint64, bool, string) { idx, res, err := s.nodeTable.Get("id", name) if err != nil { s.logger.Printf("[ERR] consul.state: Error during node lookup: %v", err) return 0, false, "" } if len(res) == 0 { return idx, false, "" } return idx, true, res[0].(*structs.Node).Address } // GetNodes returns all the known nodes, the slice alternates between // the node name and address func (s *StateStore) Nodes() (uint64, structs.Nodes) { idx, res, err := s.nodeTable.Get("id") if err != nil { s.logger.Printf("[ERR] consul.state: Error getting nodes: %v", err) } results := make([]structs.Node, len(res)) for i, r := range res { results[i] = *r.(*structs.Node) } return idx, results } // EnsureService is used to ensure a given node exposes a service func (s *StateStore) EnsureService(index uint64, node string, ns *structs.NodeService) error { tx, err := s.tables.StartTxn(false) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() if err := s.ensureServiceTxn(index, node, ns, tx); err != nil { return nil } return tx.Commit() } // ensureServiceTxn is used to ensure a given node exposes a service in a transaction func (s *StateStore) ensureServiceTxn(index uint64, node string, ns *structs.NodeService, tx *MDBTxn) error { // Ensure the node exists res, err := s.nodeTable.GetTxn(tx, "id", node) if err != nil { return err } if len(res) == 0 { return fmt.Errorf("Missing node registration") } // Create the entry entry := structs.ServiceNode{ Node: node, ServiceID: ns.ID, ServiceName: ns.Service, ServiceTags: ns.Tags, ServiceAddress: ns.Address, ServicePort: ns.Port, } // Ensure the service entry is set if err := s.serviceTable.InsertTxn(tx, &entry); err != nil { return err } if err := s.serviceTable.SetLastIndexTxn(tx, index); err != nil { return err } tx.Defer(func() { s.watch[s.serviceTable].Notify() }) return nil } // NodeServices is used to return all the services of a given node func (s *StateStore) NodeServices(name string) (uint64, *structs.NodeServices) { tables := s.queryTables["NodeServices"] tx, err := tables.StartTxn(true) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() return s.parseNodeServices(tables, tx, name) } // parseNodeServices is used to get the services belonging to a // node, using a given txn func (s *StateStore) parseNodeServices(tables MDBTables, tx *MDBTxn, name string) (uint64, *structs.NodeServices) { ns := &structs.NodeServices{ Services: make(map[string]*structs.NodeService), } // Get the maximum index index, err := tables.LastIndexTxn(tx) if err != nil { panic(fmt.Errorf("Failed to get last index: %v", err)) } // Get the node first res, err := s.nodeTable.GetTxn(tx, "id", name) if err != nil { s.logger.Printf("[ERR] consul.state: Failed to get node: %v", err) } if len(res) == 0 { return index, nil } // Set the address node := res[0].(*structs.Node) ns.Node = *node // Get the services res, err = s.serviceTable.GetTxn(tx, "id", name) if err != nil { s.logger.Printf("[ERR] consul.state: Failed to get node '%s' services: %v", name, err) } // Add each service for _, r := range res { service := r.(*structs.ServiceNode) srv := &structs.NodeService{ ID: service.ServiceID, Service: service.ServiceName, Tags: service.ServiceTags, Address: service.ServiceAddress, Port: service.ServicePort, } ns.Services[srv.ID] = srv } return index, ns } // DeleteNodeService is used to delete a node service func (s *StateStore) DeleteNodeService(index uint64, node, id string) error { tx, err := s.tables.StartTxn(false) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() if n, err := s.serviceTable.DeleteTxn(tx, "id", node, id); err != nil { return err } else if n > 0 { if err := s.serviceTable.SetLastIndexTxn(tx, index); err != nil { return err } tx.Defer(func() { s.watch[s.serviceTable].Notify() }) } // Invalidate any sessions using these checks checks, err := s.checkTable.GetTxn(tx, "node", node, id) if err != nil { return err } for _, c := range checks { check := c.(*structs.HealthCheck) if err := s.invalidateCheck(index, tx, node, check.CheckID); err != nil { return err } } if n, err := s.checkTable.DeleteTxn(tx, "node", node, id); err != nil { return err } else if n > 0 { if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil { return err } tx.Defer(func() { s.watch[s.checkTable].Notify() }) } return tx.Commit() } // DeleteNode is used to delete a node and all it's services func (s *StateStore) DeleteNode(index uint64, node string) error { tx, err := s.tables.StartTxn(false) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() // Invalidate any sessions held by the node if err := s.invalidateNode(index, tx, node); err != nil { return err } if n, err := s.serviceTable.DeleteTxn(tx, "id", node); err != nil { return err } else if n > 0 { if err := s.serviceTable.SetLastIndexTxn(tx, index); err != nil { return err } tx.Defer(func() { s.watch[s.serviceTable].Notify() }) } if n, err := s.checkTable.DeleteTxn(tx, "id", node); err != nil { return err } else if n > 0 { if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil { return err } tx.Defer(func() { s.watch[s.checkTable].Notify() }) } if n, err := s.nodeTable.DeleteTxn(tx, "id", node); err != nil { return err } else if n > 0 { if err := s.nodeTable.SetLastIndexTxn(tx, index); err != nil { return err } tx.Defer(func() { s.watch[s.nodeTable].Notify() }) } return tx.Commit() } // Services is used to return all the services with a list of associated tags func (s *StateStore) Services() (uint64, map[string][]string) { services := make(map[string][]string) idx, res, err := s.serviceTable.Get("id") if err != nil { s.logger.Printf("[ERR] consul.state: Failed to get services: %v", err) return idx, services } for _, r := range res { srv := r.(*structs.ServiceNode) tags, ok := services[srv.ServiceName] if !ok { services[srv.ServiceName] = make([]string, 0) } for _, tag := range srv.ServiceTags { if !strContains(tags, tag) { tags = append(tags, tag) services[srv.ServiceName] = tags } } } return idx, services } // ServiceNodes returns the nodes associated with a given service func (s *StateStore) ServiceNodes(service string) (uint64, structs.ServiceNodes) { tables := s.queryTables["ServiceNodes"] tx, err := tables.StartTxn(true) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() idx, err := tables.LastIndexTxn(tx) if err != nil { panic(fmt.Errorf("Failed to get last index: %v", err)) } res, err := s.serviceTable.GetTxn(tx, "service", service) return idx, s.parseServiceNodes(tx, s.nodeTable, res, err) } // ServiceTagNodes returns the nodes associated with a given service matching a tag func (s *StateStore) ServiceTagNodes(service, tag string) (uint64, structs.ServiceNodes) { tables := s.queryTables["ServiceNodes"] tx, err := tables.StartTxn(true) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() idx, err := tables.LastIndexTxn(tx) if err != nil { panic(fmt.Errorf("Failed to get last index: %v", err)) } res, err := s.serviceTable.GetTxn(tx, "service", service) res = serviceTagFilter(res, tag) return idx, s.parseServiceNodes(tx, s.nodeTable, res, err) } // serviceTagFilter is used to filter a list of *structs.ServiceNode which do // not have the specified tag func serviceTagFilter(l []interface{}, tag string) []interface{} { n := len(l) for i := 0; i < n; i++ { srv := l[i].(*structs.ServiceNode) if !strContains(ToLowerList(srv.ServiceTags), strings.ToLower(tag)) { l[i], l[n-1] = l[n-1], nil i-- n-- } } return l[:n] } // parseServiceNodes parses results ServiceNodes and ServiceTagNodes func (s *StateStore) parseServiceNodes(tx *MDBTxn, table *MDBTable, res []interface{}, err error) structs.ServiceNodes { nodes := make(structs.ServiceNodes, len(res)) if err != nil { s.logger.Printf("[ERR] consul.state: Failed to get service nodes: %v", err) return nodes } for i, r := range res { srv := r.(*structs.ServiceNode) // Get the address of the node nodeRes, err := table.GetTxn(tx, "id", srv.Node) if err != nil || len(nodeRes) != 1 { s.logger.Printf("[ERR] consul.state: Failed to join service node %#v with node: %v", *srv, err) continue } srv.Address = nodeRes[0].(*structs.Node).Address nodes[i] = *srv } return nodes } // EnsureCheck is used to create a check or updates it's state func (s *StateStore) EnsureCheck(index uint64, check *structs.HealthCheck) error { tx, err := s.tables.StartTxn(false) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() if err := s.ensureCheckTxn(index, check, tx); err != nil { return err } return tx.Commit() } // ensureCheckTxn is used to create a check or updates it's state in a transaction func (s *StateStore) ensureCheckTxn(index uint64, check *structs.HealthCheck, tx *MDBTxn) error { // Ensure we have a status if check.Status == "" { check.Status = structs.HealthCritical } // Ensure the node exists res, err := s.nodeTable.GetTxn(tx, "id", check.Node) if err != nil { return err } if len(res) == 0 { return fmt.Errorf("Missing node registration") } // Ensure the service exists if specified if check.ServiceID != "" { res, err = s.serviceTable.GetTxn(tx, "id", check.Node, check.ServiceID) if err != nil { return err } if len(res) == 0 { return fmt.Errorf("Missing service registration") } // Ensure we set the correct service srv := res[0].(*structs.ServiceNode) check.ServiceName = srv.ServiceName } // Invalidate any sessions if status is critical if check.Status == structs.HealthCritical { err := s.invalidateCheck(index, tx, check.Node, check.CheckID) if err != nil { return err } } // Ensure the check is set if err := s.checkTable.InsertTxn(tx, check); err != nil { return err } if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil { return err } tx.Defer(func() { s.watch[s.checkTable].Notify() }) return nil } // DeleteNodeCheck is used to delete a node health check func (s *StateStore) DeleteNodeCheck(index uint64, node, id string) error { tx, err := s.tables.StartTxn(false) if err != nil { return err } defer tx.Abort() // Invalidate any sessions held by this check if err := s.invalidateCheck(index, tx, node, id); err != nil { return err } if n, err := s.checkTable.DeleteTxn(tx, "id", node, id); err != nil { return err } else if n > 0 { if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil { return err } tx.Defer(func() { s.watch[s.checkTable].Notify() }) } return tx.Commit() } // NodeChecks is used to get all the checks for a node func (s *StateStore) NodeChecks(node string) (uint64, structs.HealthChecks) { return s.parseHealthChecks(s.checkTable.Get("id", node)) } // ServiceChecks is used to get all the checks for a service func (s *StateStore) ServiceChecks(service string) (uint64, structs.HealthChecks) { return s.parseHealthChecks(s.checkTable.Get("service", service)) } // CheckInState is used to get all the checks for a service in a given state func (s *StateStore) ChecksInState(state string) (uint64, structs.HealthChecks) { var idx uint64 var res []interface{} var err error if state == structs.HealthAny { idx, res, err = s.checkTable.Get("id") } else { idx, res, err = s.checkTable.Get("status", state) } return s.parseHealthChecks(idx, res, err) } // parseHealthChecks is used to handle the resutls of a Get against // the checkTable func (s *StateStore) parseHealthChecks(idx uint64, res []interface{}, err error) (uint64, structs.HealthChecks) { results := make([]*structs.HealthCheck, len(res)) if err != nil { s.logger.Printf("[ERR] consul.state: Failed to get health checks: %v", err) return idx, results } for i, r := range res { results[i] = r.(*structs.HealthCheck) } return idx, results } // CheckServiceNodes returns the nodes associated with a given service, along // with any associated check func (s *StateStore) CheckServiceNodes(service string) (uint64, structs.CheckServiceNodes) { tables := s.queryTables["CheckServiceNodes"] tx, err := tables.StartTxn(true) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() idx, err := tables.LastIndexTxn(tx) if err != nil { panic(fmt.Errorf("Failed to get last index: %v", err)) } res, err := s.serviceTable.GetTxn(tx, "service", service) return idx, s.parseCheckServiceNodes(tx, res, err) } // CheckServiceNodes returns the nodes associated with a given service, along // with any associated checks func (s *StateStore) CheckServiceTagNodes(service, tag string) (uint64, structs.CheckServiceNodes) { tables := s.queryTables["CheckServiceNodes"] tx, err := tables.StartTxn(true) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() idx, err := tables.LastIndexTxn(tx) if err != nil { panic(fmt.Errorf("Failed to get last index: %v", err)) } res, err := s.serviceTable.GetTxn(tx, "service", service) res = serviceTagFilter(res, tag) return idx, s.parseCheckServiceNodes(tx, res, err) } // parseCheckServiceNodes parses results CheckServiceNodes and CheckServiceTagNodes func (s *StateStore) parseCheckServiceNodes(tx *MDBTxn, res []interface{}, err error) structs.CheckServiceNodes { nodes := make(structs.CheckServiceNodes, len(res)) if err != nil { s.logger.Printf("[ERR] consul.state: Failed to get service nodes: %v", err) return nodes } for i, r := range res { srv := r.(*structs.ServiceNode) // Get the node nodeRes, err := s.nodeTable.GetTxn(tx, "id", srv.Node) if err != nil || len(nodeRes) != 1 { s.logger.Printf("[ERR] consul.state: Failed to join service node %#v with node: %v", *srv, err) continue } // Get any associated checks of the service res, err := s.checkTable.GetTxn(tx, "node", srv.Node, srv.ServiceID) _, checks := s.parseHealthChecks(0, res, err) // Get any checks of the node, not assciated with any service res, err = s.checkTable.GetTxn(tx, "node", srv.Node, "") _, nodeChecks := s.parseHealthChecks(0, res, err) checks = append(checks, nodeChecks...) // Setup the node nodes[i].Node = *nodeRes[0].(*structs.Node) nodes[i].Service = structs.NodeService{ ID: srv.ServiceID, Service: srv.ServiceName, Tags: srv.ServiceTags, Address: srv.ServiceAddress, Port: srv.ServicePort, } nodes[i].Checks = checks } return nodes } // NodeInfo is used to generate the full info about a node. func (s *StateStore) NodeInfo(node string) (uint64, structs.NodeDump) { tables := s.queryTables["NodeInfo"] tx, err := tables.StartTxn(true) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() idx, err := tables.LastIndexTxn(tx) if err != nil { panic(fmt.Errorf("Failed to get last index: %v", err)) } res, err := s.nodeTable.GetTxn(tx, "id", node) return idx, s.parseNodeInfo(tx, res, err) } // NodeDump is used to generate the NodeInfo for all nodes. This is very expensive, // and should generally be avoided for programatic access. func (s *StateStore) NodeDump() (uint64, structs.NodeDump) { tables := s.queryTables["NodeDump"] tx, err := tables.StartTxn(true) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() idx, err := tables.LastIndexTxn(tx) if err != nil { panic(fmt.Errorf("Failed to get last index: %v", err)) } res, err := s.nodeTable.GetTxn(tx, "id") return idx, s.parseNodeInfo(tx, res, err) } // parseNodeInfo is used to scan over the results of a node // iteration and generate a NodeDump func (s *StateStore) parseNodeInfo(tx *MDBTxn, res []interface{}, err error) structs.NodeDump { dump := make(structs.NodeDump, 0, len(res)) if err != nil { s.logger.Printf("[ERR] consul.state: Failed to get nodes: %v", err) return dump } for _, r := range res { // Copy the address and node node := r.(*structs.Node) info := &structs.NodeInfo{ Node: node.Node, Address: node.Address, } // Get any services of the node res, err = s.serviceTable.GetTxn(tx, "id", node.Node) if err != nil { s.logger.Printf("[ERR] consul.state: Failed to get node services: %v", err) } info.Services = make([]*structs.NodeService, 0, len(res)) for _, r := range res { service := r.(*structs.ServiceNode) srv := &structs.NodeService{ ID: service.ServiceID, Service: service.ServiceName, Tags: service.ServiceTags, Address: service.ServiceAddress, Port: service.ServicePort, } info.Services = append(info.Services, srv) } // Get any checks of the node res, err = s.checkTable.GetTxn(tx, "node", node.Node) if err != nil { s.logger.Printf("[ERR] consul.state: Failed to get node checks: %v", err) } info.Checks = make([]*structs.HealthCheck, 0, len(res)) for _, r := range res { chk := r.(*structs.HealthCheck) info.Checks = append(info.Checks, chk) } // Add the node info dump = append(dump, info) } return dump } // KVSSet is used to create or update a KV entry func (s *StateStore) KVSSet(index uint64, d *structs.DirEntry) error { _, err := s.kvsSet(index, d, kvSet) return err } // KVSRestore is used to restore a DirEntry. It should only be used when // doing a restore, otherwise KVSSet should be used. func (s *StateStore) KVSRestore(d *structs.DirEntry) error { // Start a new txn tx, err := s.kvsTable.StartTxn(false, nil) if err != nil { return err } defer tx.Abort() if err := s.kvsTable.InsertTxn(tx, d); err != nil { return err } if err := s.kvsTable.SetMaxLastIndexTxn(tx, d.ModifyIndex); err != nil { return err } return tx.Commit() } // KVSGet is used to get a KV entry func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) { idx, res, err := s.kvsTable.Get("id", key) var d *structs.DirEntry if len(res) > 0 { d = res[0].(*structs.DirEntry) } return idx, d, err } // KVSList is used to list all KV entries with a prefix func (s *StateStore) KVSList(prefix string) (uint64, uint64, structs.DirEntries, error) { tables := MDBTables{s.kvsTable, s.tombstoneTable} tx, err := tables.StartTxn(true) if err != nil { return 0, 0, nil, err } defer tx.Abort() idx, err := tables.LastIndexTxn(tx) if err != nil { return 0, 0, nil, err } res, err := s.kvsTable.GetTxn(tx, "id_prefix", prefix) if err != nil { return 0, 0, nil, err } ents := make(structs.DirEntries, len(res)) for idx, r := range res { ents[idx] = r.(*structs.DirEntry) } // Check for the higest index in the tombstone table var maxIndex uint64 res, err = s.tombstoneTable.GetTxn(tx, "id_prefix", prefix) for _, r := range res { ent := r.(*structs.DirEntry) if ent.ModifyIndex > maxIndex { maxIndex = ent.ModifyIndex } } return maxIndex, idx, ents, err } // KVSListKeys is used to list keys with a prefix, and up to a given seperator func (s *StateStore) KVSListKeys(prefix, seperator string) (uint64, []string, error) { tables := MDBTables{s.kvsTable, s.tombstoneTable} tx, err := tables.StartTxn(true) if err != nil { return 0, nil, err } defer tx.Abort() idx, err := s.kvsTable.LastIndexTxn(tx) if err != nil { return 0, nil, err } // Ensure a non-zero index if idx == 0 { // Must provide non-zero index to prevent blocking // Index 1 is impossible anyways (due to Raft internals) idx = 1 } // Aggregate the stream stream := make(chan interface{}, 128) streamTomb := make(chan interface{}, 128) done := make(chan struct{}) var keys []string var maxIndex uint64 go func() { prefixLen := len(prefix) sepLen := len(seperator) last := "" for raw := range stream { ent := raw.(*structs.DirEntry) after := ent.Key[prefixLen:] // Update the hightest index we've seen if ent.ModifyIndex > maxIndex { maxIndex = ent.ModifyIndex } // If there is no seperator, always accumulate if sepLen == 0 { keys = append(keys, ent.Key) continue } // Check for the seperator if idx := strings.Index(after, seperator); idx >= 0 { toSep := ent.Key[:prefixLen+idx+sepLen] if last != toSep { keys = append(keys, toSep) last = toSep } } else { keys = append(keys, ent.Key) } } // Handle the tombstones for any index updates for raw := range streamTomb { ent := raw.(*structs.DirEntry) if ent.ModifyIndex > maxIndex { maxIndex = ent.ModifyIndex } } close(done) }() // Start the stream, and wait for completion if err = s.kvsTable.StreamTxn(stream, tx, "id_prefix", prefix); err != nil { return 0, nil, err } if err := s.tombstoneTable.StreamTxn(streamTomb, tx, "id_prefix", prefix); err != nil { return 0, nil, err } <-done // Use the maxIndex if we have any keys if maxIndex != 0 { idx = maxIndex } return idx, keys, nil } // KVSDelete is used to delete a KVS entry func (s *StateStore) KVSDelete(index uint64, key string) error { return s.kvsDeleteWithIndex(index, "id", key) } // KVSDeleteCheckAndSet is used to perform an atomic delete check-and-set func (s *StateStore) KVSDeleteCheckAndSet(index uint64, key string, casIndex uint64) (bool, error) { tx, err := s.tables.StartTxn(false) if err != nil { return false, err } defer tx.Abort() // Get the existing node res, err := s.kvsTable.GetTxn(tx, "id", key) if err != nil { return false, err } // Get the existing node if any var exist *structs.DirEntry if len(res) > 0 { exist = res[0].(*structs.DirEntry) } // Use the casIndex as the constraint. A modify time of 0 means // we are doign a delete-if-not-exists (odd...), while any other // value means we expect that modify time. if casIndex == 0 { return exist == nil, nil } else if casIndex > 0 && (exist == nil || exist.ModifyIndex != casIndex) { return false, nil } // Do the actual delete if err := s.kvsDeleteWithIndexTxn(index, tx, "id", key); err != nil { return false, err } return true, tx.Commit() } // KVSDeleteTree is used to delete all keys with a given prefix func (s *StateStore) KVSDeleteTree(index uint64, prefix string) error { if prefix == "" { return s.kvsDeleteWithIndex(index, "id") } return s.kvsDeleteWithIndex(index, "id_prefix", prefix) } // kvsDeleteWithIndex does a delete with either the id or id_prefix func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts ...string) error { tx, err := s.tables.StartTxn(false) if err != nil { return err } defer tx.Abort() if err := s.kvsDeleteWithIndexTxn(index, tx, tableIndex, parts...); err != nil { return err } return tx.Commit() } // kvsDeleteWithIndexTxn does a delete within an existing transaction func (s *StateStore) kvsDeleteWithIndexTxn(index uint64, tx *MDBTxn, tableIndex string, parts ...string) error { num := 0 for { // Get some number of entries to delete pairs, err := s.kvsTable.GetTxnLimit(tx, 128, tableIndex, parts...) if err != nil { return err } // Create the tombstones and delete for _, raw := range pairs { ent := raw.(*structs.DirEntry) ent.ModifyIndex = index // Update the index ent.Value = nil // Reduce storage required ent.Session = "" if err := s.tombstoneTable.InsertTxn(tx, ent); err != nil { return err } if num, err := s.kvsTable.DeleteTxn(tx, "id", ent.Key); err != nil { return err } else if num != 1 { return fmt.Errorf("Failed to delete key '%s'", ent.Key) } } // Increment the total number num += len(pairs) if len(pairs) == 0 { break } } if num > 0 { if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { return err } tx.Defer(func() { // Trigger the most fine grained notifications if possible switch { case len(parts) == 0: s.notifyKV("", true) case tableIndex == "id": s.notifyKV(parts[0], false) case tableIndex == "id_prefix": s.notifyKV(parts[0], true) default: s.notifyKV("", true) } if s.gc != nil { // If GC is configured, then we hint that this index // required expiration. s.gc.Hint(index) } }) } return nil } // KVSCheckAndSet is used to perform an atomic check-and-set func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, error) { return s.kvsSet(index, d, kvCAS) } // KVSLock works like KVSSet but only writes if the lock can be acquired func (s *StateStore) KVSLock(index uint64, d *structs.DirEntry) (bool, error) { return s.kvsSet(index, d, kvLock) } // KVSUnlock works like KVSSet but only writes if the lock can be unlocked func (s *StateStore) KVSUnlock(index uint64, d *structs.DirEntry) (bool, error) { return s.kvsSet(index, d, kvUnlock) } // KVSLockDelay returns the expiration time of a key lock delay. A key may // have a lock delay if it was unlocked due to a session invalidation instead // of a graceful unlock. This must be checked on the leader node, and not in // KVSLock due to the variability of clocks. func (s *StateStore) KVSLockDelay(key string) time.Time { s.lockDelayLock.RLock() expires := s.lockDelay[key] s.lockDelayLock.RUnlock() return expires } // kvsSet is the internal setter func (s *StateStore) kvsSet( index uint64, d *structs.DirEntry, mode kvMode) (bool, error) { // Start a new txn tx, err := s.tables.StartTxn(false) if err != nil { return false, err } defer tx.Abort() // Get the existing node res, err := s.kvsTable.GetTxn(tx, "id", d.Key) if err != nil { return false, err } // Get the existing node if any var exist *structs.DirEntry if len(res) > 0 { exist = res[0].(*structs.DirEntry) } // Use the ModifyIndex as the constraint. A modify of time of 0 // means we are doing a set-if-not-exists, while any other value // means we expect that modify time. if mode == kvCAS { if d.ModifyIndex == 0 && exist != nil { return false, nil } else if d.ModifyIndex > 0 && (exist == nil || exist.ModifyIndex != d.ModifyIndex) { return false, nil } } // If attempting to lock, check this is possible if mode == kvLock { // Verify we have a session if d.Session == "" { return false, fmt.Errorf("Missing session") } // Bail if it is already locked if exist != nil && exist.Session != "" { return false, nil } // Verify the session exists res, err := s.sessionTable.GetTxn(tx, "id", d.Session) if err != nil { return false, err } if len(res) == 0 { return false, fmt.Errorf("Invalid session") } // Update the lock index if exist != nil { exist.LockIndex++ exist.Session = d.Session } else { d.LockIndex = 1 } } // If attempting to unlock, verify the key exists and is held if mode == kvUnlock { if exist == nil || exist.Session != d.Session { return false, nil } // Clear the session to unlock exist.Session = "" } // Set the create and modify times if exist == nil { d.CreateIndex = index } else { d.CreateIndex = exist.CreateIndex d.LockIndex = exist.LockIndex d.Session = exist.Session } d.ModifyIndex = index if err := s.kvsTable.InsertTxn(tx, d); err != nil { return false, err } if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { return false, err } tx.Defer(func() { s.notifyKV(d.Key, false) }) return true, tx.Commit() } // ReapTombstones is used to delete all the tombstones with a ModifyTime // less than or equal to the given index. This is used to prevent unbounded // storage growth of the tombstones. func (s *StateStore) ReapTombstones(index uint64) error { tx, err := s.tombstoneTable.StartTxn(false, nil) if err != nil { return fmt.Errorf("failed to start txn: %v", err) } defer tx.Abort() // Scan the tombstone table for all the entries that are // eligble for GC. This could be improved by indexing on // ModifyTime and doing a less-than-equals scan, however // we don't currently support numeric indexes internally. // Luckily, this is a low frequency operation. var toDelete []string streamCh := make(chan interface{}, 128) doneCh := make(chan struct{}) go func() { defer close(doneCh) for raw := range streamCh { ent := raw.(*structs.DirEntry) if ent.ModifyIndex <= index { toDelete = append(toDelete, ent.Key) } } }() if err := s.tombstoneTable.StreamTxn(streamCh, tx, "id"); err != nil { s.logger.Printf("[ERR] consul.state: failed to scan tombstones: %v", err) return fmt.Errorf("failed to scan tombstones: %v", err) } <-doneCh // Delete each tombstone if len(toDelete) > 0 { s.logger.Printf("[DEBUG] consul.state: reaping %d tombstones up to %d", len(toDelete), index) } for _, key := range toDelete { num, err := s.tombstoneTable.DeleteTxn(tx, "id", key) if err != nil { s.logger.Printf("[ERR] consul.state: failed to delete tombstone: %v", err) return fmt.Errorf("failed to delete tombstone: %v", err) } if num != 1 { return fmt.Errorf("failed to delete tombstone '%s'", key) } } return tx.Commit() } // TombstoneRestore is used to restore a tombstone. // It should only be used when doing a restore. func (s *StateStore) TombstoneRestore(d *structs.DirEntry) error { // Start a new txn tx, err := s.tombstoneTable.StartTxn(false, nil) if err != nil { return err } defer tx.Abort() if err := s.tombstoneTable.InsertTxn(tx, d); err != nil { return err } return tx.Commit() } // SessionCreate is used to create a new session. The // ID will be populated on a successful return func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error { // Verify a Session ID is generated if session.ID == "" { return fmt.Errorf("Missing Session ID") } switch session.Behavior { case "": // Default behavior is Release for backwards compatibility session.Behavior = structs.SessionKeysRelease case structs.SessionKeysRelease: case structs.SessionKeysDelete: default: return fmt.Errorf("Invalid Session Behavior setting '%s'", session.Behavior) } // Assign the create index session.CreateIndex = index // Start the transaction tx, err := s.tables.StartTxn(false) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() // Verify that the node exists res, err := s.nodeTable.GetTxn(tx, "id", session.Node) if err != nil { return err } if len(res) == 0 { return fmt.Errorf("Missing node registration") } // Verify that the checks exist and are not critical for _, checkId := range session.Checks { res, err := s.checkTable.GetTxn(tx, "id", session.Node, checkId) if err != nil { return err } if len(res) == 0 { return fmt.Errorf("Missing check '%s' registration", checkId) } chk := res[0].(*structs.HealthCheck) if chk.Status == structs.HealthCritical { return fmt.Errorf("Check '%s' is in %s state", checkId, chk.Status) } } // Insert the session if err := s.sessionTable.InsertTxn(tx, session); err != nil { return err } // Insert the check mappings sCheck := sessionCheck{Node: session.Node, Session: session.ID} for _, checkID := range session.Checks { sCheck.CheckID = checkID if err := s.sessionCheckTable.InsertTxn(tx, &sCheck); err != nil { return err } } // Trigger the update notifications if err := s.sessionTable.SetLastIndexTxn(tx, index); err != nil { return err } tx.Defer(func() { s.watch[s.sessionTable].Notify() }) return tx.Commit() } // SessionRestore is used to restore a session. It should only be used when // doing a restore, otherwise SessionCreate should be used. func (s *StateStore) SessionRestore(session *structs.Session) error { // Start the transaction tx, err := s.tables.StartTxn(false) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() // Insert the session if err := s.sessionTable.InsertTxn(tx, session); err != nil { return err } // Insert the check mappings sCheck := sessionCheck{Node: session.Node, Session: session.ID} for _, checkID := range session.Checks { sCheck.CheckID = checkID if err := s.sessionCheckTable.InsertTxn(tx, &sCheck); err != nil { return err } } // Trigger the update notifications index := session.CreateIndex if err := s.sessionTable.SetMaxLastIndexTxn(tx, index); err != nil { return err } tx.Defer(func() { s.watch[s.sessionTable].Notify() }) return tx.Commit() } // SessionGet is used to get a session entry func (s *StateStore) SessionGet(id string) (uint64, *structs.Session, error) { idx, res, err := s.sessionTable.Get("id", id) var d *structs.Session if len(res) > 0 { d = res[0].(*structs.Session) } return idx, d, err } // SessionList is used to list all the open sessions func (s *StateStore) SessionList() (uint64, []*structs.Session, error) { idx, res, err := s.sessionTable.Get("id") out := make([]*structs.Session, len(res)) for i, raw := range res { out[i] = raw.(*structs.Session) } return idx, out, err } // NodeSessions is used to list all the open sessions for a node func (s *StateStore) NodeSessions(node string) (uint64, []*structs.Session, error) { idx, res, err := s.sessionTable.Get("node", node) out := make([]*structs.Session, len(res)) for i, raw := range res { out[i] = raw.(*structs.Session) } return idx, out, err } // SessionDestroy is used to destroy a session. func (s *StateStore) SessionDestroy(index uint64, id string) error { tx, err := s.tables.StartTxn(false) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() s.logger.Printf("[DEBUG] consul.state: Invalidating session %s due to session destroy", id) if err := s.invalidateSession(index, tx, id); err != nil { return err } return tx.Commit() } // invalideNode is used to invalide all sessions belonging to a node // All tables should be locked in the tx. func (s *StateStore) invalidateNode(index uint64, tx *MDBTxn, node string) error { sessions, err := s.sessionTable.GetTxn(tx, "node", node) if err != nil { return err } for _, sess := range sessions { session := sess.(*structs.Session).ID s.logger.Printf("[DEBUG] consul.state: Invalidating session %s due to node '%s' invalidation", session, node) if err := s.invalidateSession(index, tx, session); err != nil { return err } } return nil } // invalidateCheck is used to invalide all sessions belonging to a check // All tables should be locked in the tx. func (s *StateStore) invalidateCheck(index uint64, tx *MDBTxn, node, check string) error { sessionChecks, err := s.sessionCheckTable.GetTxn(tx, "id", node, check) if err != nil { return err } for _, sc := range sessionChecks { session := sc.(*sessionCheck).Session s.logger.Printf("[DEBUG] consul.state: Invalidating session %s due to check '%s' invalidation", session, check) if err := s.invalidateSession(index, tx, session); err != nil { return err } } return nil } // invalidateSession is used to invalide a session within a given txn // All tables should be locked in the tx. func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) error { // Get the session res, err := s.sessionTable.GetTxn(tx, "id", id) if err != nil { return err } // Quit if this session does not exist if len(res) == 0 { return nil } session := res[0].(*structs.Session) // Enforce the MaxLockDelay delay := session.LockDelay if delay > structs.MaxLockDelay { delay = structs.MaxLockDelay } // Invalidate any held locks if session.Behavior == structs.SessionKeysDelete { if err := s.deleteLocks(index, tx, delay, id); err != nil { return err } } else if err := s.invalidateLocks(index, tx, delay, id); err != nil { return err } // Nuke the session if _, err := s.sessionTable.DeleteTxn(tx, "id", id); err != nil { return err } // Delete the check mappings for _, checkID := range session.Checks { if _, err := s.sessionCheckTable.DeleteTxn(tx, "id", session.Node, checkID, id); err != nil { return err } } // Trigger the update notifications if err := s.sessionTable.SetLastIndexTxn(tx, index); err != nil { return err } tx.Defer(func() { s.watch[s.sessionTable].Notify() }) return nil } // invalidateLocks is used to invalidate all the locks held by a session // within a given txn. All tables should be locked in the tx. func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn, lockDelay time.Duration, id string) error { pairs, err := s.kvsTable.GetTxn(tx, "session", id) if err != nil { return err } var expires time.Time if lockDelay > 0 { s.lockDelayLock.Lock() defer s.lockDelayLock.Unlock() expires = time.Now().Add(lockDelay) } for _, pair := range pairs { kv := pair.(*structs.DirEntry) kv.Session = "" // Clear the lock kv.ModifyIndex = index // Update the modified time if err := s.kvsTable.InsertTxn(tx, kv); err != nil { return err } // If there is a lock delay, prevent acquisition // for at least lockDelay period if lockDelay > 0 { s.lockDelay[kv.Key] = expires time.AfterFunc(lockDelay, func() { s.lockDelayLock.Lock() delete(s.lockDelay, kv.Key) s.lockDelayLock.Unlock() }) } tx.Defer(func() { s.notifyKV(kv.Key, false) }) } if len(pairs) > 0 { if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { return err } } return nil } // deleteLocks is used to delete all the locks held by a session // within a given txn. All tables should be locked in the tx. func (s *StateStore) deleteLocks(index uint64, tx *MDBTxn, lockDelay time.Duration, id string) error { pairs, err := s.kvsTable.GetTxn(tx, "session", id) if err != nil { return err } var expires time.Time if lockDelay > 0 { s.lockDelayLock.Lock() defer s.lockDelayLock.Unlock() expires = time.Now().Add(lockDelay) } for _, pair := range pairs { kv := pair.(*structs.DirEntry) if err := s.kvsDeleteWithIndexTxn(index, tx, "id", kv.Key); err != nil { return err } // If there is a lock delay, prevent acquisition // for at least lockDelay period if lockDelay > 0 { s.lockDelay[kv.Key] = expires time.AfterFunc(lockDelay, func() { s.lockDelayLock.Lock() delete(s.lockDelay, kv.Key) s.lockDelayLock.Unlock() }) } } return nil } // ACLSet is used to create or update an ACL entry func (s *StateStore) ACLSet(index uint64, acl *structs.ACL) error { // Check for an ID if acl.ID == "" { return fmt.Errorf("Missing ACL ID") } // Start a new txn tx, err := s.tables.StartTxn(false) if err != nil { return err } defer tx.Abort() // Look for the existing node res, err := s.aclTable.GetTxn(tx, "id", acl.ID) if err != nil { return err } switch len(res) { case 0: acl.CreateIndex = index acl.ModifyIndex = index case 1: exist := res[0].(*structs.ACL) acl.CreateIndex = exist.CreateIndex acl.ModifyIndex = index default: panic(fmt.Errorf("Duplicate ACL definition. Internal error")) } // Insert the ACL if err := s.aclTable.InsertTxn(tx, acl); err != nil { return err } // Trigger the update notifications if err := s.aclTable.SetLastIndexTxn(tx, index); err != nil { return err } tx.Defer(func() { s.watch[s.aclTable].Notify() }) return tx.Commit() } // ACLRestore is used to restore an ACL. It should only be used when // doing a restore, otherwise ACLSet should be used. func (s *StateStore) ACLRestore(acl *structs.ACL) error { // Start a new txn tx, err := s.aclTable.StartTxn(false, nil) if err != nil { return err } defer tx.Abort() if err := s.aclTable.InsertTxn(tx, acl); err != nil { return err } if err := s.aclTable.SetMaxLastIndexTxn(tx, acl.ModifyIndex); err != nil { return err } return tx.Commit() } // ACLGet is used to get an ACL by ID func (s *StateStore) ACLGet(id string) (uint64, *structs.ACL, error) { idx, res, err := s.aclTable.Get("id", id) var d *structs.ACL if len(res) > 0 { d = res[0].(*structs.ACL) } return idx, d, err } // ACLList is used to list all the acls func (s *StateStore) ACLList() (uint64, []*structs.ACL, error) { idx, res, err := s.aclTable.Get("id") out := make([]*structs.ACL, len(res)) for i, raw := range res { out[i] = raw.(*structs.ACL) } return idx, out, err } // ACLDelete is used to remove an ACL func (s *StateStore) ACLDelete(index uint64, id string) error { tx, err := s.tables.StartTxn(false) if err != nil { panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() if n, err := s.aclTable.DeleteTxn(tx, "id", id); err != nil { return err } else if n > 0 { if err := s.aclTable.SetLastIndexTxn(tx, index); err != nil { return err } tx.Defer(func() { s.watch[s.aclTable].Notify() }) } return tx.Commit() } // Snapshot is used to create a point in time snapshot func (s *StateStore) Snapshot() (*StateSnapshot, error) { // Begin a new txn on all tables tx, err := s.tables.StartTxn(true) if err != nil { return nil, err } // Determine the max index index, err := s.tables.LastIndexTxn(tx) if err != nil { tx.Abort() return nil, err } // Return the snapshot snap := &StateSnapshot{ store: s, tx: tx, lastIndex: index, } return snap, nil } // LastIndex returns the last index that affects the snapshotted data func (s *StateSnapshot) LastIndex() uint64 { return s.lastIndex } // Nodes returns all the known nodes, the slice alternates between // the node name and address func (s *StateSnapshot) Nodes() structs.Nodes { res, err := s.store.nodeTable.GetTxn(s.tx, "id") if err != nil { s.store.logger.Printf("[ERR] consul.state: Failed to get nodes: %v", err) return nil } results := make([]structs.Node, len(res)) for i, r := range res { results[i] = *r.(*structs.Node) } return results } // NodeServices is used to return all the services of a given node func (s *StateSnapshot) NodeServices(name string) *structs.NodeServices { _, res := s.store.parseNodeServices(s.store.tables, s.tx, name) return res } // NodeChecks is used to return all the checks of a given node func (s *StateSnapshot) NodeChecks(node string) structs.HealthChecks { res, err := s.store.checkTable.GetTxn(s.tx, "id", node) _, checks := s.store.parseHealthChecks(s.lastIndex, res, err) return checks } // KVSDump is used to list all KV entries. It takes a channel and streams // back *struct.DirEntry objects. This will block and should be invoked // in a goroutine. func (s *StateSnapshot) KVSDump(stream chan<- interface{}) error { return s.store.kvsTable.StreamTxn(stream, s.tx, "id") } // TombstoneDump is used to dump all tombstone entries. It takes a channel and streams // back *struct.DirEntry objects. This will block and should be invoked // in a goroutine. func (s *StateSnapshot) TombstoneDump(stream chan<- interface{}) error { return s.store.tombstoneTable.StreamTxn(stream, s.tx, "id") } // SessionList is used to list all the open sessions func (s *StateSnapshot) SessionList() ([]*structs.Session, error) { res, err := s.store.sessionTable.GetTxn(s.tx, "id") out := make([]*structs.Session, len(res)) for i, raw := range res { out[i] = raw.(*structs.Session) } return out, err } // ACLList is used to list all of the ACLs func (s *StateSnapshot) ACLList() ([]*structs.ACL, error) { res, err := s.store.aclTable.GetTxn(s.tx, "id") out := make([]*structs.ACL, len(res)) for i, raw := range res { out[i] = raw.(*structs.ACL) } return out, err }