nomad: adding compound index on alloc terminal status

This commit is contained in:
Armon Dadgar 2016-02-20 11:24:06 -08:00
parent 2ec5d7de76
commit abf7e52689
3 changed files with 92 additions and 4 deletions

View file

@ -222,9 +222,27 @@ func allocTableSchema() *memdb.TableSchema {
Name: "node",
AllowMissing: true, // Missing is allow for failed allocations
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "NodeID",
Lowercase: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "NodeID",
Lowercase: true,
},
// Conditional indexer on if allocation is terminal
&memdb.ConditionalIndex{
Conditional: func(obj interface{}) (bool, error) {
// Cast to allocation
alloc, ok := obj.(*structs.Allocation)
if !ok {
return false, fmt.Errorf("wrong type, got %t should be Allocation", obj)
}
// Check if the allocation is terminal
return alloc.TerminalStatus(), nil
},
},
},
},
},

View file

@ -865,8 +865,30 @@ func (s *StateStore) AllocsByIDPrefix(id string) (memdb.ResultIterator, error) {
func (s *StateStore) AllocsByNode(node string) ([]*structs.Allocation, error) {
txn := s.db.Txn(false)
// Get an iterator over the node allocations, using only the
// node prefix which ignores the terminal status
iter, err := txn.Get("allocs", "node_prefix", node)
if err != nil {
return nil, err
}
var out []*structs.Allocation
for {
raw := iter.Next()
if raw == nil {
break
}
out = append(out, raw.(*structs.Allocation))
}
return out, nil
}
// AllocsByNode returns all the allocations by node and terminal status
func (s *StateStore) AllocsByNodeTerminal(node string, terminal bool) ([]*structs.Allocation, error) {
txn := s.db.Txn(false)
// Get an iterator over the node allocations
iter, err := txn.Get("allocs", "node", node)
iter, err := txn.Get("allocs", "node", node, terminal)
if err != nil {
return nil, err
}

View file

@ -1581,6 +1581,54 @@ func TestStateStore_AllocsByNode(t *testing.T) {
}
}
func TestStateStore_AllocsByNodeTerminal(t *testing.T) {
state := testStateStore(t)
var allocs, term, nonterm []*structs.Allocation
for i := 0; i < 10; i++ {
alloc := mock.Alloc()
alloc.NodeID = "foo"
if i%2 == 0 {
alloc.DesiredStatus = structs.AllocDesiredStatusStop
term = append(term, alloc)
} else {
nonterm = append(nonterm, alloc)
}
allocs = append(allocs, alloc)
}
err := state.UpsertAllocs(1000, allocs)
if err != nil {
t.Fatalf("err: %v", err)
}
// Verify the terminal allocs
out, err := state.AllocsByNodeTerminal("foo", true)
if err != nil {
t.Fatalf("err: %v", err)
}
sort.Sort(AllocIDSort(term))
sort.Sort(AllocIDSort(out))
if !reflect.DeepEqual(term, out) {
t.Fatalf("bad: %#v %#v", term, out)
}
// Verify the non-terminal allocs
out, err = state.AllocsByNodeTerminal("foo", false)
if err != nil {
t.Fatalf("err: %v", err)
}
sort.Sort(AllocIDSort(nonterm))
sort.Sort(AllocIDSort(out))
if !reflect.DeepEqual(nonterm, out) {
t.Fatalf("bad: %#v %#v", nonterm, out)
}
}
func TestStateStore_AllocsByJob(t *testing.T) {
state := testStateStore(t)
var allocs []*structs.Allocation