consul/state: working on service registration storage
This commit is contained in:
parent
011e7584b4
commit
f0dd8b2923
|
@ -91,7 +91,7 @@ func servicesTableSchema() *memdb.TableSchema {
|
|||
AllowMissing: false,
|
||||
Unique: true,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
Field: "ServiceID",
|
||||
Field: "ID",
|
||||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
|
@ -100,7 +100,7 @@ func servicesTableSchema() *memdb.TableSchema {
|
|||
AllowMissing: true,
|
||||
Unique: false,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
Field: "ServiceName",
|
||||
Field: "Service",
|
||||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
|
|
|
@ -98,3 +98,45 @@ func (s *StateStore) GetNode(id string) (*structs.Node, error) {
|
|||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// EnsureService is called to upsert creation of a given NodeService.
|
||||
func (s *StateStore) EnsureService(idx uint64, svc *structs.NodeService) error {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
// Call the service registration upsert
|
||||
if err := s.ensureServiceTxn(idx, svc, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tx.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
// ensureServiceTxn is used to upsert a service registration within an
|
||||
// existing memdb transaction.
|
||||
func (s *StateStore) ensureServiceTxn(idx uint64, svc *structs.NodeService, tx *memdb.Txn) error {
|
||||
// Check for existing service
|
||||
existing, err := tx.First("services", "id", svc.Service)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
||||
// Populate the indexes
|
||||
if existing != nil {
|
||||
svc.CreateIndex = existing.(*structs.NodeService).CreateIndex
|
||||
svc.ModifyIndex = idx
|
||||
} else {
|
||||
svc.CreateIndex = idx
|
||||
svc.ModifyIndex = idx
|
||||
}
|
||||
|
||||
// Insert the service and update the index
|
||||
if err := tx.Insert("services", svc); err != nil {
|
||||
return fmt.Errorf("failed inserting service: %s", err)
|
||||
}
|
||||
if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -65,3 +65,21 @@ func TestStateStore_EnsureNode(t *testing.T) {
|
|||
t.Fatalf("bad: %#v", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_EnsureService(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Create the service registration
|
||||
in := &structs.NodeService{
|
||||
ID: "service1",
|
||||
Service: "redis",
|
||||
Tags: []string{"prod"},
|
||||
Address: "1.1.1.1",
|
||||
Port: 1111,
|
||||
}
|
||||
|
||||
// Service successfully registers into the state store
|
||||
if err := s.EnsureService(1, in); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -260,7 +260,10 @@ type NodeService struct {
|
|||
Address string
|
||||
Port int
|
||||
EnableTagOverride bool
|
||||
|
||||
RaftIndex
|
||||
}
|
||||
|
||||
type NodeServices struct {
|
||||
Node Node
|
||||
Services map[string]*NodeService
|
||||
|
|
Loading…
Reference in New Issue