diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 162eed79c..1114cd2e3 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -1250,6 +1250,36 @@ func (s *Store) updateAllServiceIndexesOfNode(tx *memdb.Txn, idx uint64, nodeID return nil } +// ensureCheckCASTxn updates a check only if the existing index matches the given index. +// Returns a bool indicating if a write happened and any error. +func (s *Store) ensureCheckCASTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck) (bool, error) { + // Retrieve the existing entry. + existing, err := tx.First("checks", "id", hc.Node, string(hc.CheckID)) + if err != nil { + return false, fmt.Errorf("failed health check lookup: %s", err) + } + + // Check if the we should do the set. A ModifyIndex of 0 means that + // we are doing a set-if-not-exists. + if hc.ModifyIndex == 0 && existing != nil { + return false, nil + } + if hc.ModifyIndex != 0 && existing == nil { + return false, nil + } + e, ok := existing.(*structs.HealthCheck) + if ok && hc.ModifyIndex != 0 && hc.ModifyIndex != e.ModifyIndex { + return false, nil + } + + // Perform the update. + if err := s.ensureCheckTxn(tx, idx, hc); err != nil { + return false, err + } + + return true, nil +} + // ensureCheckTransaction is used as the inner method to handle inserting // a health check into the state store. It ensures safety against inserting // checks with no matching node or service. @@ -1366,6 +1396,12 @@ func (s *Store) NodeCheck(nodeName string, checkID types.CheckID) (uint64, *stru tx := s.db.Txn(false) defer tx.Abort() + return s.nodeCheckTxn(tx, nodeName, checkID) +} + +// nodeCheckTxn is used as the inner method to handle reading a health check +// from the state store. +func (s *Store) nodeCheckTxn(tx *memdb.Txn, nodeName string, checkID types.CheckID) (uint64, *structs.HealthCheck, error) { // Get the table index. idx := maxIndexTxn(tx, "checks") @@ -1555,6 +1591,35 @@ func (s *Store) DeleteCheck(idx uint64, node string, checkID types.CheckID) erro return nil } +// deleteCheckCASTxn is used to try doing a check delete operation with a given +// raft index. If the CAS index specified is not equal to the last bserved index for +// the given check, then the call is a noop, otherwise a normal check delete is invoked. +func (s *Store) deleteCheckCASTxn(tx *memdb.Txn, idx, cidx uint64, node string, checkID types.CheckID) (bool, error) { + // Try to retrieve the existing health check. + hc, err := tx.First("checks", "id", node, string(checkID)) + if err != nil { + return false, fmt.Errorf("check lookup failed: %s", err) + } + if hc == nil { + return false, nil + } + + // If the existing index does not match the provided CAS + // index arg, then we shouldn't update anything and can safely + // return early here. + existing, ok := hc.(*structs.HealthCheck) + if !ok || existing.ModifyIndex != cidx { + return existing == nil, nil + } + + // Call the actual deletion if the above passed. + if err := s.deleteCheckTxn(tx, idx, node, checkID); err != nil { + return false, err + } + + return true, nil +} + // deleteCheckTxn is the inner method used to call a health // check deletion within an existing transaction. func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID types.CheckID) error { diff --git a/agent/consul/state/txn.go b/agent/consul/state/txn.go index c90f21634..61bcf042d 100644 --- a/agent/consul/state/txn.go +++ b/agent/consul/state/txn.go @@ -118,10 +118,68 @@ func (s *Store) txnIntention(tx *memdb.Txn, idx uint64, op *structs.TxnIntention case structs.IntentionOpDelete: return s.intentionDeleteTxn(tx, idx, op.Intention.ID) default: - return fmt.Errorf("unknown Intention verb %q", op.Op) + return fmt.Errorf("unknown Intention op %q", op.Op) } } +// txnCheck handles all Check-related operations. +func (s *Store) txnCheck(tx *memdb.Txn, idx uint64, op *structs.TxnCheckOp) (structs.TxnResults, error) { + var entry *structs.HealthCheck + var err error + + switch op.Verb { + case api.CheckGet: + _, entry, err = s.nodeCheckTxn(tx, op.Check.Node, op.Check.CheckID) + if entry == nil && err == nil { + err = fmt.Errorf("check %q on node %q doesn't exist", op.Check.CheckID, op.Check.Node) + } + + case api.CheckSet: + entry = &op.Check + err = s.ensureCheckTxn(tx, idx, entry) + + case api.CheckCAS: + var ok bool + entry = &op.Check + ok, err = s.ensureCheckCASTxn(tx, idx, entry) + if !ok && err == nil { + err = fmt.Errorf("failed to set check %q on node %q, index is stale", entry.CheckID, entry.Node) + } + + case api.CheckDelete: + err = s.deleteCheckTxn(tx, idx, op.Check.Node, op.Check.CheckID) + + case api.CheckDeleteCAS: + var ok bool + ok, err = s.deleteCheckCASTxn(tx, idx, op.Check.ModifyIndex, op.Check.Node, op.Check.CheckID) + if !ok && err == nil { + err = fmt.Errorf("failed to delete check %q on node %q, index is stale", op.Check.CheckID, op.Check.Node) + } + + default: + err = fmt.Errorf("unknown Check verb %q", op.Verb) + } + if err != nil { + return nil, err + } + + // For a GET we keep the value, otherwise we clone and blank out the + // value (we have to clone so we don't modify the entry being used by + // the state store). + if entry != nil { + if op.Verb == api.CheckGet { + result := structs.TxnResult{Check: entry} + return structs.TxnResults{&result}, nil + } + + clone := entry.Clone() + result := structs.TxnResult{Check: clone} + return structs.TxnResults{&result}, nil + } + + return nil, nil +} + // txnDispatch runs the given operations inside the state store transaction. func (s *Store) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) { results := make(structs.TxnResults, 0, len(ops)) diff --git a/agent/structs/structs.go b/agent/structs/structs.go index c1bd34a9f..371edcb40 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -916,7 +916,8 @@ func (c *HealthCheck) IsSame(other *HealthCheck) bool { c.Output != other.Output || c.ServiceID != other.ServiceID || c.ServiceName != other.ServiceName || - !reflect.DeepEqual(c.ServiceTags, other.ServiceTags) { + !reflect.DeepEqual(c.ServiceTags, other.ServiceTags) || + !reflect.DeepEqual(c.Definition, other.Definition) { return false } diff --git a/agent/structs/txn.go b/agent/structs/txn.go index c3b814b63..cdd97b05a 100644 --- a/agent/structs/txn.go +++ b/agent/structs/txn.go @@ -9,7 +9,7 @@ import ( ) // TxnKVOp is used to define a single operation on the KVS inside a -// transaction +// transaction. type TxnKVOp struct { Verb api.KVOp DirEnt DirEntry @@ -19,6 +19,17 @@ type TxnKVOp struct { // inside a transaction. type TxnKVResult *DirEntry +// TxnCheckOp is used to define a single operation on a health check inside a +// transaction. +type TxnCheckOp struct { + Verb api.CheckOp + Check HealthCheck +} + +// TxnCheckResult is used to define the result of a single operation on a health +// check inside a transaction. +type TxnCheckResult *HealthCheck + // TxnKVOp is used to define a single operation on an Intention inside a // transaction. type TxnIntentionOp IntentionRequest @@ -28,6 +39,7 @@ type TxnIntentionOp IntentionRequest type TxnOp struct { KV *TxnKVOp Intention *TxnIntentionOp + Check *TxnCheckOp } // TxnOps is a list of operations within a transaction. @@ -75,7 +87,8 @@ type TxnErrors []*TxnError // TxnResult is used to define the result of a given operation inside a // transaction. Only one of the types should be filled out per entry. type TxnResult struct { - KV TxnKVResult + KV TxnKVResult + Check TxnCheckResult } // TxnResults is a list of TxnResult entries. diff --git a/api/kv.go b/api/kv.go index 97f515685..cb79f1872 100644 --- a/api/kv.go +++ b/api/kv.go @@ -45,44 +45,6 @@ type KVPair struct { // KVPairs is a list of KVPair objects type KVPairs []*KVPair -// KVOp constants give possible operations available in a KVTxn. -type KVOp string - -const ( - KVSet KVOp = "set" - KVDelete KVOp = "delete" - KVDeleteCAS KVOp = "delete-cas" - KVDeleteTree KVOp = "delete-tree" - KVCAS KVOp = "cas" - KVLock KVOp = "lock" - KVUnlock KVOp = "unlock" - KVGet KVOp = "get" - KVGetTree KVOp = "get-tree" - KVCheckSession KVOp = "check-session" - KVCheckIndex KVOp = "check-index" - KVCheckNotExists KVOp = "check-not-exists" -) - -// KVTxnOp defines a single operation inside a transaction. -type KVTxnOp struct { - Verb KVOp - Key string - Value []byte - Flags uint64 - Index uint64 - Session string -} - -// KVTxnOps defines a set of operations to be performed inside a single -// transaction. -type KVTxnOps []*KVTxnOp - -// KVTxnResponse has the outcome of a transaction. -type KVTxnResponse struct { - Results []*KVPair - Errors TxnErrors -} - // KV is used to manipulate the K/V API type KV struct { c *Client @@ -300,121 +262,25 @@ func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOption return res, qm, nil } -// TxnOp is the internal format we send to Consul. It's not specific to KV, -// though currently only KV operations are supported. -type TxnOp struct { - KV *KVTxnOp -} - -// TxnOps is a list of transaction operations. -type TxnOps []*TxnOp - -// TxnResult is the internal format we receive from Consul. -type TxnResult struct { - KV *KVPair -} - -// TxnResults is a list of TxnResult objects. -type TxnResults []*TxnResult - -// TxnError is used to return information about an operation in a transaction. -type TxnError struct { - OpIndex int - What string -} - -// TxnErrors is a list of TxnError objects. -type TxnErrors []*TxnError - -// TxnResponse is the internal format we receive from Consul. -type TxnResponse struct { - Results TxnResults - Errors TxnErrors -} - -// Txn is used to apply multiple KV operations in a single, atomic transaction. -// -// Note that Go will perform the required base64 encoding on the values -// automatically because the type is a byte slice. Transactions are defined as a -// list of operations to perform, using the KVOp constants and KVTxnOp structure -// to define operations. If any operation fails, none of the changes are applied -// to the state store. Note that this hides the internal raw transaction interface -// and munges the input and output types into KV-specific ones for ease of use. -// If there are more non-KV operations in the future we may break out a new -// transaction API client, but it will be easy to keep this KV-specific variant -// supported. -// -// Even though this is generally a write operation, we take a QueryOptions input -// and return a QueryMeta output. If the transaction contains only read ops, then -// Consul will fast-path it to a different endpoint internally which supports -// consistency controls, but not blocking. If there are write operations then -// the request will always be routed through raft and any consistency settings -// will be ignored. -// -// Here's an example: -// -// ops := KVTxnOps{ -// &KVTxnOp{ -// Verb: KVLock, -// Key: "test/lock", -// Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e", -// Value: []byte("hello"), -// }, -// &KVTxnOp{ -// Verb: KVGet, -// Key: "another/key", -// }, -// } -// ok, response, _, err := kv.Txn(&ops, nil) -// -// If there is a problem making the transaction request then an error will be -// returned. Otherwise, the ok value will be true if the transaction succeeded -// or false if it was rolled back. The response is a structured return value which -// will have the outcome of the transaction. Its Results member will have entries -// for each operation. Deleted keys will have a nil entry in the, and to save -// space, the Value of each key in the Results will be nil unless the operation -// is a KVGet. If the transaction was rolled back, the Errors member will have -// entries referencing the index of the operation that failed along with an error -// message. +// The Txn function has been deprecated from the KV object; please see the Txn +// object for more information about Transactions. func (k *KV) Txn(txn KVTxnOps, q *QueryOptions) (bool, *KVTxnResponse, *QueryMeta, error) { - r := k.c.newRequest("PUT", "/v1/txn") - r.setQueryOptions(q) - - // Convert into the internal format since this is an all-KV txn. - ops := make(TxnOps, 0, len(txn)) - for _, kvOp := range txn { - ops = append(ops, &TxnOp{KV: kvOp}) + ops := make(TxnOps, len(txn)) + for _, op := range txn { + ops = append(ops, &TxnOp{KV: op}) } - r.obj = ops - rtt, resp, err := k.c.doRequest(r) + + respOk, txnResp, qm, err := k.c.txn(ops, q) if err != nil { return false, nil, nil, err } - defer resp.Body.Close() - qm := &QueryMeta{} - parseQueryMeta(resp, qm) - qm.RequestTime = rtt - - if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict { - var txnResp TxnResponse - if err := decodeBody(resp, &txnResp); err != nil { - return false, nil, nil, err - } - - // Convert from the internal format. - kvResp := KVTxnResponse{ - Errors: txnResp.Errors, - } - for _, result := range txnResp.Results { - kvResp.Results = append(kvResp.Results, result.KV) - } - return resp.StatusCode == http.StatusOK, &kvResp, qm, nil + // Convert from the internal format. + kvResp := KVTxnResponse{ + Errors: txnResp.Errors, } - - var buf bytes.Buffer - if _, err := io.Copy(&buf, resp.Body); err != nil { - return false, nil, nil, fmt.Errorf("Failed to read response: %v", err) + for _, result := range txnResp.Results { + kvResp.Results = append(kvResp.Results, result.KV) } - return false, nil, nil, fmt.Errorf("Failed request: %s", buf.String()) + return respOk, &kvResp, qm, nil } diff --git a/api/txn.go b/api/txn.go new file mode 100644 index 000000000..017b72672 --- /dev/null +++ b/api/txn.go @@ -0,0 +1,201 @@ +package api + +import ( + "bytes" + "fmt" + "io" + "net/http" +) + +// Txn is used to manipulate the Txn API +type Txn struct { + c *Client +} + +// Txn is used to return a handle to the K/V apis +func (c *Client) Txn() *Txn { + return &Txn{c} +} + +// TxnOp is the internal format we send to Consul. Currently only K/V and +// check operations are supported. +type TxnOp struct { + KV *KVTxnOp + Check *CheckTxnOp +} + +// TxnOps is a list of transaction operations. +type TxnOps []*TxnOp + +// TxnResult is the internal format we receive from Consul. +type TxnResult struct { + KV *KVPair + Check *HealthCheck +} + +// TxnResults is a list of TxnResult objects. +type TxnResults []*TxnResult + +// TxnError is used to return information about an operation in a transaction. +type TxnError struct { + OpIndex int + What string +} + +// TxnErrors is a list of TxnError objects. +type TxnErrors []*TxnError + +// TxnResponse is the internal format we receive from Consul. +type TxnResponse struct { + Results TxnResults + Errors TxnErrors +} + +// KVOp constants give possible operations available in a transaction. +type KVOp string + +const ( + KVSet KVOp = "set" + KVDelete KVOp = "delete" + KVDeleteCAS KVOp = "delete-cas" + KVDeleteTree KVOp = "delete-tree" + KVCAS KVOp = "cas" + KVLock KVOp = "lock" + KVUnlock KVOp = "unlock" + KVGet KVOp = "get" + KVGetTree KVOp = "get-tree" + KVCheckSession KVOp = "check-session" + KVCheckIndex KVOp = "check-index" + KVCheckNotExists KVOp = "check-not-exists" +) + +// KVTxnOp defines a single operation inside a transaction. +type KVTxnOp struct { + Verb KVOp + Key string + Value []byte + Flags uint64 + Index uint64 + Session string +} + +// KVTxnOps defines a set of operations to be performed inside a single +// transaction. +type KVTxnOps []*KVTxnOp + +// KVTxnResponse has the outcome of a transaction. +type KVTxnResponse struct { + Results []*KVPair + Errors TxnErrors +} + +// CheckOp constants give possible operations available in a transaction. +type CheckOp string + +const ( + CheckGet CheckOp = "get" + CheckSet CheckOp = "set" + CheckCAS CheckOp = "cas" + CheckDelete CheckOp = "delete" + CheckDeleteCAS CheckOp = "delete-cas" +) + +// CheckTxnOp defines a single operation inside a transaction. +type CheckTxnOp struct { + Verb CheckOp + Check HealthCheck +} + +// Txn is used to apply multiple Consul operations in a single, atomic transaction. +// +// Note that Go will perform the required base64 encoding on the values +// automatically because the type is a byte slice. Transactions are defined as a +// list of operations to perform, using the different fields in the TxnOp structure +// to define operations. If any operation fails, none of the changes are applied +// to the state store. +// +// Even though this is generally a write operation, we take a QueryOptions input +// and return a QueryMeta output. If the transaction contains only read ops, then +// Consul will fast-path it to a different endpoint internally which supports +// consistency controls, but not blocking. If there are write operations then +// the request will always be routed through raft and any consistency settings +// will be ignored. +// +// Here's an example: +// +// ops := KVTxnOps{ +// &KVTxnOp{ +// Verb: KVLock, +// Key: "test/lock", +// Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e", +// Value: []byte("hello"), +// }, +// &KVTxnOp{ +// Verb: KVGet, +// Key: "another/key", +// }, +// &CheckTxnOp{ +// Verb: CheckSet, +// HealthCheck: HealthCheck{ +// Node: "foo", +// CheckID: "redis:a", +// Name: "Redis Health Check", +// Status: "passing", +// }, +// } +// } +// ok, response, _, err := kv.Txn(&ops, nil) +// +// If there is a problem making the transaction request then an error will be +// returned. Otherwise, the ok value will be true if the transaction succeeded +// or false if it was rolled back. The response is a structured return value which +// will have the outcome of the transaction. Its Results member will have entries +// for each operation. For KV operations, Deleted keys will have a nil entry in the +// results, and to save space, the Value of each key in the Results will be nil +// unless the operation is a KVGet. If the transaction was rolled back, the Errors +// member will have entries referencing the index of the operation that failed +// along with an error message. +func (t *Txn) Txn(txn TxnOps, q *QueryOptions) (bool, *TxnResponse, *QueryMeta, error) { + return t.c.txn(txn, q) +} + +func (c *Client) txn(txn TxnOps, q *QueryOptions) (bool, *TxnResponse, *QueryMeta, error) { + r := c.newRequest("PUT", "/v1/txn") + r.setQueryOptions(q) + + // Convert into the internal txn format. + ops := make(TxnOps, 0, len(txn)) + for _, kvOp := range txn { + switch { + case kvOp.KV != nil: + ops = append(ops, &TxnOp{KV: kvOp.KV}) + case kvOp.Check != nil: + ops = append(ops, &TxnOp{Check: kvOp.Check}) + } + } + r.obj = ops + rtt, resp, err := c.doRequest(r) + if err != nil { + return false, nil, nil, err + } + defer resp.Body.Close() + + qm := &QueryMeta{} + parseQueryMeta(resp, qm) + qm.RequestTime = rtt + + if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict { + var txnResp TxnResponse + if err := decodeBody(resp, &txnResp); err != nil { + return false, nil, nil, err + } + + return resp.StatusCode == http.StatusOK, &txnResp, qm, nil + } + + var buf bytes.Buffer + if _, err := io.Copy(&buf, resp.Body); err != nil { + return false, nil, nil, fmt.Errorf("Failed to read response: %v", err) + } + return false, nil, nil, fmt.Errorf("Failed request: %s", buf.String()) +}