diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 66e034366..c4d06a671 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -735,6 +735,36 @@ func (s *Store) EnsureService(idx uint64, node string, svc *structs.NodeService) return nil } +// ensureServiceCASTxn updates a service only if the existing index matches the given index. +// Returns a bool indicating if a write happened and any error. +func (s *Store) ensureServiceCASTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) (bool, error) { + // Retrieve the existing entry. + existing, err := tx.First("nodes", "id", node) + if err != nil { + return false, fmt.Errorf("node lookup failed: %s", err) + } + + // Check if the we should do the set. A ModifyIndex of 0 means that + // we are doing a set-if-not-exists. + if svc.ModifyIndex == 0 && existing != nil { + return false, nil + } + if svc.ModifyIndex != 0 && existing == nil { + return false, nil + } + e, ok := existing.(*structs.Node) + if ok && svc.ModifyIndex != 0 && svc.ModifyIndex != e.ModifyIndex { + return false, nil + } + + // Perform the update. + if err := s.ensureServiceTxn(tx, idx, node, svc); err != nil { + return false, err + } + + return true, nil +} + // ensureServiceTxn is used to upsert a service registration within an // existing memdb transaction. func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) error { @@ -1111,15 +1141,26 @@ func (s *Store) NodeService(nodeName string, serviceID string) (uint64, *structs idx := maxIndexTxn(tx, "services") // Query the service - service, err := tx.First("services", "id", nodeName, serviceID) + service, err := s.nodeServiceTxn(tx, nodeName, serviceID) if err != nil { return 0, nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err) } - if service != nil { - return idx, service.(*structs.ServiceNode).ToNodeService(), nil + return idx, service, nil +} + +func (s *Store) nodeServiceTxn(tx *memdb.Txn, nodeName, serviceID string) (*structs.NodeService, error) { + // Query the service + service, err := tx.First("services", "id", nodeName, serviceID) + if err != nil { + return nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err) } - return idx, nil, nil + + if service != nil { + return service.(*structs.ServiceNode).ToNodeService(), nil + } + + return nil, nil } // NodeServices is used to query service registrations by node name or UUID. @@ -1214,6 +1255,35 @@ func serviceIndexName(name string) string { return fmt.Sprintf("service.%s", name) } +// deleteServiceCASTxn is used to try doing a service delete operation with a given +// raft index. If the CAS index specified is not equal to the last observed index for +// the given service, then the call is a noop, otherwise a normal delete is invoked. +func (s *Store) deleteServiceCASTxn(tx *memdb.Txn, idx, cidx uint64, nodeName, serviceID string) (bool, error) { + // Look up the service. + service, err := tx.First("services", "id", nodeName, serviceID) + if err != nil { + return false, fmt.Errorf("check lookup failed: %s", err) + } + if service == nil { + return false, nil + } + + // If the existing index does not match the provided CAS + // index arg, then we shouldn't update anything and can safely + // return early here. + existing, ok := service.(*structs.ServiceNode) + if !ok || existing.ModifyIndex != cidx { + return existing == nil, nil + } + + // Call the actual deletion if the above passed. + if err := s.deleteServiceTxn(tx, idx, nodeName, serviceID); err != nil { + return false, err + } + + return true, nil +} + // deleteServiceTxn is the inner method called to remove a service // registration within an existing transaction. func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error { diff --git a/agent/consul/state/txn.go b/agent/consul/state/txn.go index 082271cff..578e9721a 100644 --- a/agent/consul/state/txn.go +++ b/agent/consul/state/txn.go @@ -175,6 +175,59 @@ func (s *Store) txnNode(tx *memdb.Txn, idx uint64, op *structs.TxnNodeOp) (struc return nil, nil } +// txnService handles all Service-related operations. +func (s *Store) txnService(tx *memdb.Txn, idx uint64, op *structs.TxnServiceOp) (structs.TxnResults, error) { + var entry *structs.NodeService + var err error + + switch op.Verb { + case api.ServiceGet: + entry, err = s.nodeServiceTxn(tx, op.Node, op.Service.ID) + + case api.ServiceSet: + err = s.ensureServiceTxn(tx, idx, op.Node, &op.Service) + + case api.ServiceCAS: + var ok bool + ok, err = s.ensureServiceCASTxn(tx, idx, op.Node, &op.Service) + if !ok && err == nil { + err = fmt.Errorf("failed to set service %q on node %q, index is stale", op.Service.ID, op.Node) + } + + case api.ServiceDelete: + err = s.deleteServiceTxn(tx, idx, op.Node, op.Service.ID) + + case api.ServiceDeleteCAS: + var ok bool + ok, err = s.deleteServiceCASTxn(tx, idx, op.Service.ModifyIndex, op.Node, op.Service.ID) + if !ok && err == nil { + err = fmt.Errorf("failed to delete service %q on node %q, index is stale", op.Service.ID, op.Node) + } + + default: + err = fmt.Errorf("unknown Service verb %q", op.Verb) + } + if err != nil { + return nil, err + } + + // For a GET we keep the value, otherwise we clone and blank out the + // value (we have to clone so we don't modify the entry being used by + // the state store). + if entry != nil { + if op.Verb == api.ServiceGet { + result := structs.TxnResult{Service: entry} + return structs.TxnResults{&result}, nil + } + + clone := *entry + result := structs.TxnResult{Service: &clone} + return structs.TxnResults{&result}, nil + } + + return nil, nil +} + // txnCheck handles all Check-related operations. func (s *Store) txnCheck(tx *memdb.Txn, idx uint64, op *structs.TxnCheckOp) (structs.TxnResults, error) { var entry *structs.HealthCheck @@ -247,6 +300,12 @@ func (s *Store) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps) (stru ret, err = s.txnKVS(tx, idx, op.KV) case op.Intention != nil: err = s.txnIntention(tx, idx, op.Intention) + case op.Node != nil: + ret, err = s.txnNode(tx, idx, op.Node) + case op.Service != nil: + ret, err = s.txnService(tx, idx, op.Service) + case op.Check != nil: + ret, err = s.txnCheck(tx, idx, op.Check) default: err = fmt.Errorf("no operation specified") } diff --git a/agent/structs/txn.go b/agent/structs/txn.go index 12453bb3a..5d136ccf7 100644 --- a/agent/structs/txn.go +++ b/agent/structs/txn.go @@ -34,6 +34,7 @@ type TxnNodeResult *Node // a transaction. type TxnServiceOp struct { Verb api.ServiceOp + Node string Service NodeService }