Merge pull request #17 from hashicorp/f-kv

Adding simple Key/Value Store
This commit is contained in:
Armon Dadgar 2014-04-01 12:44:13 -07:00
commit 80b2c35b46
17 changed files with 1937 additions and 23 deletions

View File

@ -88,6 +88,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/agent/service/register", s.wrap(s.AgentRegisterService))
s.mux.HandleFunc("/v1/agent/service/deregister", s.wrap(s.AgentDeregisterService))
s.mux.HandleFunc("/v1/kv/", s.wrap(s.KVSEndpoint))
if enableDebug {
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)

View File

@ -8,6 +8,7 @@ register new services.
The URLs are also versioned to allow for changes in the API.
The current URLs supported are:
Catalog:
* /v1/catalog/register : Registers a new service
* /v1/catalog/deregister : Deregisters a service or node
* /v1/catalog/datacenters : Lists known datacenters
@ -16,15 +17,17 @@ The current URLs supported are:
* /v1/catalog/service/<service>/ : Lists the nodes in a given service
* /v1/catalog/node/<node>/ : Lists the services provided by a node
* Health system:
Health system:
* /v1/health/node/<node>: Returns the health info of a node
* /v1/health/checks/<service>: Returns the checks of a service
* /v1/health/service/<service>: Returns the nodes and health info of a service
* /v1/health/state/<state>: Returns the checks in a given state
Status:
* /v1/status/leader : Returns the current Raft leader
* /v1/status/peers : Returns the current Raft peer set
Agent:
* /v1/agent/checks: Returns the checks the local agent is managing
* /v1/agent/services : Returns the services local agent is managing
* /v1/agent/members : Returns the members as seen by the local serf agent
@ -37,3 +40,7 @@ The current URLs supported are:
* /v1/agent/check/fail/<name>
* /v1/agent/service/register
* /v1/agent/service/deregister/<name>
KVS:
* /v1/kv/<key>

View File

@ -0,0 +1,153 @@
package agent
import (
"bytes"
"github.com/hashicorp/consul/consul/structs"
"io"
"net/http"
"strconv"
"strings"
)
func (s *HTTPServer) KVSEndpoint(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Set default DC
args := structs.KeyRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done {
return nil, nil
}
// Pull out the key name, validation left to each sub-handler
args.Key = strings.TrimPrefix(req.URL.Path, "/v1/kv/")
// Switch on the method
switch req.Method {
case "GET":
return s.KVSGet(resp, req, &args)
case "PUT":
return s.KVSPut(resp, req, &args)
case "DELETE":
return s.KVSDelete(resp, req, &args)
default:
resp.WriteHeader(405)
return nil, nil
}
return nil, nil
}
// KVSGet handles a GET request
func (s *HTTPServer) KVSGet(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) {
// Check for recurse
method := "KVS.Get"
params := req.URL.Query()
if _, ok := params["recurse"]; ok {
method = "KVS.List"
} else if missingKey(resp, args) {
return nil, nil
}
// Make the RPC
var out structs.IndexedDirEntries
if err := s.agent.RPC(method, &args, &out); err != nil {
return nil, err
}
setIndex(resp, out.Index)
// Check if we get a not found
if len(out.Entries) == 0 {
resp.WriteHeader(404)
return nil, nil
}
return out.Entries, nil
}
// KVSPut handles a PUT request
func (s *HTTPServer) KVSPut(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) {
if missingKey(resp, args) {
return nil, nil
}
applyReq := structs.KVSRequest{
Datacenter: args.Datacenter,
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: args.Key,
Flags: 0,
Value: nil,
},
}
// Check for flags
params := req.URL.Query()
if _, ok := params["flags"]; ok {
flagVal, err := strconv.ParseUint(params.Get("flags"), 10, 64)
if err != nil {
return nil, err
}
applyReq.DirEnt.Flags = flagVal
}
// Check for cas value
if _, ok := params["cas"]; ok {
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
if err != nil {
return nil, err
}
applyReq.DirEnt.ModifyIndex = casVal
applyReq.Op = structs.KVSCAS
}
// Copy the value
buf := bytes.NewBuffer(nil)
if _, err := io.Copy(buf, req.Body); err != nil {
return nil, err
}
applyReq.DirEnt.Value = buf.Bytes()
// Make the RPC
var out bool
if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil {
return nil, err
}
// Only use the out value if this was a CAS
if applyReq.Op == structs.KVSSet {
return true, nil
} else {
return out, nil
}
}
// KVSPut handles a DELETE request
func (s *HTTPServer) KVSDelete(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) {
applyReq := structs.KVSRequest{
Datacenter: args.Datacenter,
Op: structs.KVSDelete,
DirEnt: structs.DirEntry{
Key: args.Key,
},
}
// Check for recurse
params := req.URL.Query()
if _, ok := params["recurse"]; ok {
applyReq.Op = structs.KVSDeleteTree
} else if missingKey(resp, args) {
return nil, nil
}
// Make the RPC
var out bool
if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil {
return nil, err
}
return nil, nil
}
// missingKey checks if the key is missing
func missingKey(resp http.ResponseWriter, args *structs.KeyRequest) bool {
if args.Key == "" {
resp.WriteHeader(400)
resp.Write([]byte("Missing key name"))
return true
}
return false
}

View File

@ -0,0 +1,291 @@
package agent
import (
"bytes"
"fmt"
"github.com/hashicorp/consul/consul/structs"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"
)
func TestKVSEndpoint_PUT_GET_DELETE(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Wait for a leader
time.Sleep(100 * time.Millisecond)
keys := []string{
"baz",
"bar",
"foo/sub1",
"foo/sub2",
"zip",
}
for _, key := range keys {
buf := bytes.NewBuffer([]byte("test"))
req, err := http.NewRequest("PUT", "/v1/kv/"+key, buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if res := obj.(bool); !res {
t.Fatalf("should work")
}
}
for _, key := range keys {
req, err := http.NewRequest("GET", "/v1/kv/"+key, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
header := resp.Header().Get("X-Consul-Index")
if header == "" {
t.Fatalf("Bad: %v", header)
}
res, ok := obj.(structs.DirEntries)
if !ok {
t.Fatalf("should work")
}
if len(res) != 1 {
t.Fatalf("bad: %v", res)
}
if res[0].Key != key {
t.Fatalf("bad: %v", res)
}
}
for _, key := range keys {
req, err := http.NewRequest("DELETE", "/v1/kv/"+key, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
_, err = srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
}
}
func TestKVSEndpoint_Recurse(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Wait for a leader
time.Sleep(100 * time.Millisecond)
keys := []string{
"bar",
"baz",
"foo/sub1",
"foo/sub2",
"zip",
}
for _, key := range keys {
buf := bytes.NewBuffer([]byte("test"))
req, err := http.NewRequest("PUT", "/v1/kv/"+key, buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if res := obj.(bool); !res {
t.Fatalf("should work")
}
}
{
// Get all the keys
req, err := http.NewRequest("GET", "/v1/kv/?recurse", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
header := resp.Header().Get("X-Consul-Index")
if header == "" {
t.Fatalf("Bad: %v", header)
}
res, ok := obj.(structs.DirEntries)
if !ok {
t.Fatalf("should work")
}
if len(res) != len(keys) {
t.Fatalf("bad: %v", res)
}
for idx, key := range keys {
if res[idx].Key != key {
t.Fatalf("bad: %v %v", res[idx].Key, key)
}
}
}
{
req, err := http.NewRequest("DELETE", "/v1/kv/?recurse", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
_, err = srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
}
{
// Get all the keys
req, err := http.NewRequest("GET", "/v1/kv/?recurse", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if obj != nil {
t.Fatalf("bad: %v", obj)
}
}
}
func TestKVSEndpoint_CAS(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Wait for a leader
time.Sleep(100 * time.Millisecond)
{
buf := bytes.NewBuffer([]byte("test"))
req, err := http.NewRequest("PUT", "/v1/kv/test?flags=50", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if res := obj.(bool); !res {
t.Fatalf("should work")
}
}
req, err := http.NewRequest("GET", "/v1/kv/test", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
d := obj.(structs.DirEntries)[0]
// Check the flags
if d.Flags != 50 {
t.Fatalf("bad: %v", d)
}
// Create a CAS request, bad index
{
buf := bytes.NewBuffer([]byte("zip"))
req, err := http.NewRequest("PUT",
fmt.Sprintf("/v1/kv/test?flags=42&cas=%d", d.ModifyIndex-1), buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if res := obj.(bool); res {
t.Fatalf("should NOT work")
}
}
// Create a CAS request, good index
{
buf := bytes.NewBuffer([]byte("zip"))
req, err := http.NewRequest("PUT",
fmt.Sprintf("/v1/kv/test?flags=42&cas=%d", d.ModifyIndex), buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if res := obj.(bool); !res {
t.Fatalf("should work")
}
}
// Verify the update
req, _ = http.NewRequest("GET", "/v1/kv/test", nil)
resp = httptest.NewRecorder()
obj, _ = srv.KVSEndpoint(resp, req)
d = obj.(structs.DirEntries)[0]
if d.Flags != 42 {
t.Fatalf("bad: %v", d)
}
if string(d.Value) != "zip" {
t.Fatalf("bad: %v", d)
}
}

View File

@ -65,6 +65,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
return c.decodeRegister(buf[1:], log.Index)
case structs.DeregisterRequestType:
return c.applyDeregister(buf[1:], log.Index)
case structs.KVSRequestType:
return c.applyKVSOperation(buf[1:], log.Index)
default:
panic(fmt.Errorf("failed to apply request: %#v", buf))
}
@ -131,6 +133,32 @@ func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} {
return nil
}
func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
var req structs.KVSRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
switch req.Op {
case structs.KVSSet:
return c.state.KVSSet(index, &req.DirEnt)
case structs.KVSDelete:
return c.state.KVSDelete(index, req.DirEnt.Key)
case structs.KVSDeleteTree:
return c.state.KVSDeleteTree(index, req.DirEnt.Key)
case structs.KVSCAS:
act, err := c.state.KVSCheckAndSet(index, &req.DirEnt)
if err != nil {
return err
} else {
return act
}
default:
c.logger.Printf("[WARN] consul.fsm: Invalid KVS operation '%s'", req.Op)
return fmt.Errorf("Invalid KVS operation '%s'", req.Op)
}
return nil
}
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
defer func(start time.Time) {
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))
@ -152,6 +180,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
if err != nil {
return err
}
c.state.Close()
c.state = state
// Create a decoder
@ -184,6 +213,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
}
c.applyRegister(&req, header.LastIndex)
case structs.KVSRequestType:
var req structs.DirEntry
if err := dec.Decode(&req); err != nil {
return err
}
if err := c.state.KVSRestore(&req); err != nil {
return err
}
default:
return fmt.Errorf("Unrecognized msg type: %v", msgType)
}
@ -247,6 +285,38 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
}
}
}
// Enable GC of the ndoes
nodes = nil
// Dump the KVS entries
streamCh := make(chan interface{}, 256)
errorCh := make(chan error)
go func() {
if err := s.state.KVSDump(streamCh); err != nil {
errorCh <- err
}
}()
OUTER:
for {
select {
case raw := <-streamCh:
if raw == nil {
break OUTER
}
sink.Write([]byte{byte(structs.KVSRequestType)})
if err := encoder.Encode(raw); err != nil {
sink.Cancel()
return err
}
case err := <-errorCh:
sink.Cancel()
return err
}
}
return nil
}

View File

@ -322,6 +322,10 @@ func TestFSM_SnapshotRestore(t *testing.T) {
Status: structs.HealthPassing,
ServiceID: "web",
})
fsm.state.KVSSet(8, &structs.DirEntry{
Key: "/test",
Value: []byte("foo"),
})
// Snapshot
snap, err := fsm.Snapshot()
@ -370,4 +374,198 @@ func TestFSM_SnapshotRestore(t *testing.T) {
if len(checks) != 1 {
t.Fatalf("Bad: %v", checks)
}
// Verify key is set
_, d, err := fsm.state.KVSGet("/test")
if err != nil {
t.Fatalf("err: %v", err)
}
if string(d.Value) != "foo" {
t.Fatalf("bad: %v", d)
}
}
func TestFSM_KVSSet(t *testing.T) {
fsm, err := NewFSM(os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "/test/path",
Flags: 0,
Value: []byte("test"),
},
}
buf, err := structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify key is set
_, d, err := fsm.state.KVSGet("/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("missing")
}
}
func TestFSM_KVSDelete(t *testing.T) {
fsm, err := NewFSM(os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "/test/path",
Flags: 0,
Value: []byte("test"),
},
}
buf, err := structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Run the delete
req.Op = structs.KVSDelete
buf, err = structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify key is not set
_, d, err := fsm.state.KVSGet("/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
if d != nil {
t.Fatalf("key present")
}
}
func TestFSM_KVSDeleteTree(t *testing.T) {
fsm, err := NewFSM(os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "/test/path",
Flags: 0,
Value: []byte("test"),
},
}
buf, err := structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Run the delete tree
req.Op = structs.KVSDeleteTree
req.DirEnt.Key = "/test"
buf, err = structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify key is not set
_, d, err := fsm.state.KVSGet("/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
if d != nil {
t.Fatalf("key present")
}
}
func TestFSM_KVSCheckAndSet(t *testing.T) {
fsm, err := NewFSM(os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
defer fsm.Close()
req := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "/test/path",
Flags: 0,
Value: []byte("test"),
},
}
buf, err := structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify key is set
_, d, err := fsm.state.KVSGet("/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("key missing")
}
// Run the check-and-set
req.Op = structs.KVSCAS
req.DirEnt.ModifyIndex = d.ModifyIndex
req.DirEnt.Value = []byte("zip")
buf, err = structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp.(bool) != true {
t.Fatalf("resp: %v", resp)
}
// Verify key is updated
_, d, err = fsm.state.KVSGet("/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
if string(d.Value) != "zip" {
t.Fatalf("bad: %v", d)
}
}

115
consul/kvs_endpoint.go Normal file
View File

@ -0,0 +1,115 @@
package consul
import (
"fmt"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs"
"time"
)
// KVS endpoint is used to manipulate the Key-Value store
type KVS struct {
srv *Server
}
// Apply is used to apply a KVS request to the data store. This should
// only be used for operations that modify the data
func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
if done, err := k.srv.forward("KVS.Apply", args.Datacenter, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"consul", "kvs", "apply"}, time.Now())
// Verify the args
if args.DirEnt.Key == "" && args.Op != structs.KVSDeleteTree {
return fmt.Errorf("Must provide key")
}
// Apply the update
resp, err := k.srv.raftApply(structs.KVSRequestType, args)
if err != nil {
k.srv.logger.Printf("[ERR] consul.kvs: Apply failed: %v", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
// Check if the return type is a bool
if respBool, ok := resp.(bool); ok {
*reply = respBool
}
return nil
}
// Get is used to lookup a single key
func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {
if done, err := k.srv.forward("KVS.Get", args.Datacenter, args, reply); done {
return err
}
// Get the local state
state := k.srv.fsm.State()
return k.srv.blockingRPC(&args.BlockingQuery,
state.QueryTables("KVSGet"),
func() (uint64, error) {
index, ent, err := state.KVSGet(args.Key)
if err != nil {
return 0, err
}
if ent == nil {
// Must provide non-zero index to prevent blocking
// Index 1 is impossible anyways (due to Raft internals)
if index == 0 {
reply.Index = 1
} else {
reply.Index = index
}
reply.Entries = nil
} else {
reply.Index = ent.ModifyIndex
reply.Entries = structs.DirEntries{ent}
}
return reply.Index, nil
})
}
// List is used to list all keys with a given prefix
func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {
if done, err := k.srv.forward("KVS.List", args.Datacenter, args, reply); done {
return err
}
// Get the local state
state := k.srv.fsm.State()
return k.srv.blockingRPC(&args.BlockingQuery,
state.QueryTables("KVSList"),
func() (uint64, error) {
index, ent, err := state.KVSList(args.Key)
if err != nil {
return 0, err
}
if len(ent) == 0 {
// Must provide non-zero index to prevent blocking
// Index 1 is impossible anyways (due to Raft internals)
if index == 0 {
reply.Index = 1
} else {
reply.Index = index
}
reply.Entries = nil
} else {
// Determine the maximum affected index
var maxIndex uint64
for _, e := range ent {
if e.ModifyIndex > maxIndex {
maxIndex = e.ModifyIndex
}
}
reply.Index = maxIndex
reply.Entries = ent
}
return reply.Index, nil
})
}

173
consul/kvs_endpoint_test.go Normal file
View File

@ -0,0 +1,173 @@
package consul
import (
"github.com/hashicorp/consul/consul/structs"
"os"
"testing"
"time"
)
func TestKVS_Apply(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
// Wait for leader
time.Sleep(100 * time.Millisecond)
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "test",
Flags: 42,
Value: []byte("test"),
},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Verify
state := s1.fsm.State()
_, d, err := state.KVSGet("test")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("should not be nil")
}
// Do a check and set
arg.Op = structs.KVSCAS
arg.DirEnt.ModifyIndex = d.ModifyIndex
arg.DirEnt.Flags = 43
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Check this was applied
if out != true {
t.Fatalf("bad: %v", out)
}
// Verify
_, d, err = state.KVSGet("test")
if err != nil {
t.Fatalf("err: %v", err)
}
if d.Flags != 43 {
t.Fatalf("bad: %v", d)
}
}
func TestKVS_Get(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
// Wait for leader
time.Sleep(100 * time.Millisecond)
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "test",
Flags: 42,
Value: []byte("test"),
},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
getR := structs.KeyRequest{
Datacenter: "dc1",
Key: "test",
}
var dirent structs.IndexedDirEntries
if err := client.Call("KVS.Get", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
if dirent.Index == 0 {
t.Fatalf("Bad: %v", dirent)
}
if len(dirent.Entries) != 1 {
t.Fatalf("Bad: %v", dirent)
}
d := dirent.Entries[0]
if d.Flags != 42 {
t.Fatalf("bad: %v", d)
}
if string(d.Value) != "test" {
t.Fatalf("bad: %v", d)
}
}
func TestKVSEndpoint_List(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
// Wait for leader
time.Sleep(100 * time.Millisecond)
keys := []string{
"/test/key1",
"/test/key2",
"/test/sub/key3",
}
for _, key := range keys {
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: key,
Flags: 1,
},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
getR := structs.KeyRequest{
Datacenter: "dc1",
Key: "/test",
}
var dirent structs.IndexedDirEntries
if err := client.Call("KVS.List", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
if dirent.Index == 0 {
t.Fatalf("Bad: %v", dirent)
}
if len(dirent.Entries) != 3 {
t.Fatalf("Bad: %v", dirent.Entries)
}
for i := 0; i < len(dirent.Entries); i++ {
d := dirent.Entries[i]
if d.Key != keys[i] {
t.Fatalf("bad: %v", d)
}
if d.Flags != 1 {
t.Fatalf("bad: %v", d)
}
if d.Value != nil {
t.Fatalf("bad: %v", d)
}
}
}

View File

@ -49,10 +49,13 @@ type MDBIndex struct {
Unique bool // Controls if values are unique
Fields []string // Fields are used to build the index
IdxFunc IndexFunc // Can be used to provide custom indexing
Virtual bool // Virtual index does not exist, but can be used for queries
RealIndex string // Virtual indexes use a RealIndex for iteration
table *MDBTable
name string
dbiName string
table *MDBTable
name string
dbiName string
realIndex *MDBIndex
}
// MDBTxn is used to wrap an underlying transaction
@ -88,6 +91,17 @@ func DefaultIndexFunc(idx *MDBIndex, parts []string) string {
return prefix
}
// DefaultIndexPrefixFunc can be used with DefaultIndexFunc to scan
// for index prefix values. This should only be used as part of a
// virtual index.
func DefaultIndexPrefixFunc(idx *MDBIndex, parts []string) string {
if len(parts) == 0 {
return "_"
}
prefix := "_" + strings.Join(parts, "||")
return prefix
}
// Init is used to initialize the MDBTable and ensure it's ready
func (t *MDBTable) Init() error {
if t.Env == nil {
@ -111,6 +125,9 @@ func (t *MDBTable) Init() error {
if id.AllowBlank {
return fmt.Errorf("id index must not allow blanks")
}
if id.Virtual {
return fmt.Errorf("id index cannot be virtual")
}
// Create the table
if err := t.createTable(); err != nil {
@ -221,6 +238,9 @@ EXTEND:
mdbTxn.dbis[t.Name] = dbi
for _, index := range t.Indexes {
if index.Virtual {
continue
}
dbi, err := index.openDBI(tx)
if err != nil {
tx.Abort()
@ -237,6 +257,9 @@ func (t *MDBTable) objIndexKeys(obj interface{}) (map[string][]byte, error) {
// Construct the indexes keys
indexes := make(map[string][]byte)
for name, index := range t.Indexes {
if index.Virtual {
continue
}
key, err := index.keyFromObject(obj)
if err != nil {
return nil, err
@ -301,6 +324,9 @@ AFTER_DELETE:
// Insert the new indexes
for name, index := range t.Indexes {
if index.Virtual {
continue
}
dbi := tx.dbis[index.dbiName]
if err := tx.tx.Put(dbi, indexes[name], encRowId, 0); err != nil {
return err
@ -350,6 +376,29 @@ func (t *MDBTable) GetTxn(tx *MDBTxn, index string, parts ...string) ([]interfac
return results, err
}
// StreamTxn is like GetTxn but it streams the results over a channel.
// This can be used if the expected data set is very large. The stream
// is always closed on return.
func (t *MDBTable) StreamTxn(stream chan<- interface{}, tx *MDBTxn, index string, parts ...string) error {
// Always close the stream on return
defer close(stream)
// Get the associated index
idx, key, err := t.getIndex(index, parts)
if err != nil {
return err
}
// Stream the results
err = idx.iterate(tx, key, func(encRowId, res []byte) bool {
obj := t.Decoder(res)
stream <- obj
return false
})
return err
}
// getIndex is used to get the proper index, and also check the arity
func (t *MDBTable) getIndex(index string, parts []string) (*MDBIndex, []byte, error) {
// Get the index
@ -427,6 +476,12 @@ func (t *MDBTable) deleteWithIndex(tx *MDBTxn, idx *MDBIndex, key []byte) (num i
if name == idx.name {
continue
}
if idx.Virtual && name == idx.RealIndex {
continue
}
if otherIdx.Virtual {
continue
}
dbi := tx.dbis[otherIdx.dbiName]
if err := tx.tx.Del(dbi, indexes[name], encRowId); err != nil {
panic(err)
@ -464,11 +519,23 @@ func (i *MDBIndex) init(table *MDBTable, name string) error {
if err := i.createIndex(); err != nil {
return err
}
// Verify real index exists
if i.Virtual {
if realIndex, ok := table.Indexes[i.RealIndex]; !ok {
return fmt.Errorf("real index '%s' missing", i.RealIndex)
} else {
i.realIndex = realIndex
}
}
return nil
}
// createIndex is used to ensure the index exists
func (i *MDBIndex) createIndex() error {
// Do not create if this is a virtual index
if i.Virtual {
return nil
}
tx, err := i.table.Env.BeginTxn(nil, 0)
if err != nil {
return err
@ -529,7 +596,14 @@ func (i *MDBIndex) keyFromParts(parts ...string) []byte {
func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte,
cb func(encRowId, res []byte) bool) error {
table := tx.dbis[i.table.Name]
dbi := tx.dbis[i.dbiName]
// If virtual, use the correct DBI
var dbi mdb.DBI
if i.Virtual {
dbi = tx.dbis[i.realIndex.dbiName]
} else {
dbi = tx.dbis[i.dbiName]
}
cursor, err := tx.tx.CursorOpen(dbi)
if err != nil {

View File

@ -781,3 +781,194 @@ func TestMDBTableDelete_Prefix(t *testing.T) {
t.Fatalf("expect 2 result: %#v", res)
}
}
func TestMDBTableVirtualIndex(t *testing.T) {
dir, env := testMDBEnv(t)
defer os.RemoveAll(dir)
defer env.Close()
table := &MDBTable{
Env: env,
Name: "test",
Indexes: map[string]*MDBIndex{
"id": &MDBIndex{
Unique: true,
Fields: []string{"First"},
},
"id_prefix": &MDBIndex{
Virtual: true,
RealIndex: "id",
Fields: []string{"First"},
IdxFunc: DefaultIndexPrefixFunc,
},
},
Encoder: MockEncoder,
Decoder: MockDecoder,
}
if err := table.Init(); err != nil {
t.Fatalf("err: %v", err)
}
if table.lastRowID != 0 {
t.Fatalf("bad last row id: %d", table.lastRowID)
}
objs := []*MockData{
&MockData{
Key: "1",
First: "Jack",
Last: "Smith",
Country: "USA",
},
&MockData{
Key: "2",
First: "John",
Last: "Wang",
Country: "USA",
},
&MockData{
Key: "3",
First: "James",
Last: "Torres",
Country: "Mexico",
},
}
// Insert some mock objects
for idx, obj := range objs {
if err := table.Insert(obj); err != nil {
t.Fatalf("err: %v", err)
}
if err := table.SetLastIndex(uint64(4 * idx)); err != nil {
t.Fatalf("err: %v", err)
}
}
if table.lastRowID != 3 {
t.Fatalf("bad last row id: %d", table.lastRowID)
}
if idx, _ := table.LastIndex(); idx != 8 {
t.Fatalf("bad last idx: %d", idx)
}
_, res, err := table.Get("id_prefix", "J")
if err != nil {
t.Fatalf("err: %v", err)
}
if len(res) != 3 {
t.Fatalf("expect 3 result: %#v", res)
}
_, res, err = table.Get("id_prefix", "Ja")
if err != nil {
t.Fatalf("err: %v", err)
}
if len(res) != 2 {
t.Fatalf("expect 2 result: %#v", res)
}
num, err := table.Delete("id_prefix", "Ja")
if err != nil {
t.Fatalf("err: %v", err)
}
if num != 2 {
t.Fatalf("expect 2 result: %#v", num)
}
_, res, err = table.Get("id_prefix", "J")
if err != nil {
t.Fatalf("err: %v", err)
}
if len(res) != 1 {
t.Fatalf("expect 1 result: %#v", res)
}
}
func TestMDBTableStream(t *testing.T) {
dir, env := testMDBEnv(t)
defer os.RemoveAll(dir)
defer env.Close()
table := &MDBTable{
Env: env,
Name: "test",
Indexes: map[string]*MDBIndex{
"id": &MDBIndex{
Unique: true,
Fields: []string{"Key"},
},
"name": &MDBIndex{
Fields: []string{"First", "Last"},
},
"country": &MDBIndex{
Fields: []string{"Country"},
},
},
Encoder: MockEncoder,
Decoder: MockDecoder,
}
if err := table.Init(); err != nil {
t.Fatalf("err: %v", err)
}
objs := []*MockData{
&MockData{
Key: "1",
First: "Kevin",
Last: "Smith",
Country: "USA",
},
&MockData{
Key: "2",
First: "Kevin",
Last: "Wang",
Country: "USA",
},
&MockData{
Key: "3",
First: "Bernardo",
Last: "Torres",
Country: "Mexico",
},
}
// Insert some mock objects
for idx, obj := range objs {
if err := table.Insert(obj); err != nil {
t.Fatalf("err: %v", err)
}
if err := table.SetLastIndex(uint64(idx + 1)); err != nil {
t.Fatalf("err: %v", err)
}
}
// Start a readonly txn
tx, err := table.StartTxn(true, nil)
if err != nil {
panic(err)
}
defer tx.Abort()
// Stream the records
streamCh := make(chan interface{})
go func() {
if err := table.StreamTxn(streamCh, tx, "id"); err != nil {
t.Fatalf("err: %v", err)
}
}()
// Verify we get them all
idx := 0
for obj := range streamCh {
p := obj.(*MockData)
if !reflect.DeepEqual(p, objs[idx]) {
t.Fatalf("bad: %#v %#v", p, objs[idx])
}
idx++
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
}

View File

@ -18,3 +18,8 @@ func (r *Raft) RemovePeer(args string, reply *struct{}) error {
future := r.server.raft.RemovePeer(peer)
return future.Error()
}
func (r *Raft) Snapshot(args struct{}, reply *struct{}) error {
future := r.server.raft.Snapshot()
return future.Error()
}

View File

@ -101,6 +101,7 @@ type endpoints struct {
Health *Health
Raft *Raft
Status *Status
KVS *KVS
}
// NewServer is used to construct a new Consul server from the
@ -276,12 +277,14 @@ func (s *Server) setupRPC() error {
s.endpoints.Raft = &Raft{s}
s.endpoints.Catalog = &Catalog{s}
s.endpoints.Health = &Health{s}
s.endpoints.KVS = &KVS{s}
// Register the handlers
s.rpcServer.Register(s.endpoints.Status)
s.rpcServer.Register(s.endpoints.Raft)
s.rpcServer.Register(s.endpoints.Catalog)
s.rpcServer.Register(s.endpoints.Health)
s.rpcServer.Register(s.endpoints.KVS)
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
if err != nil {

View File

@ -14,7 +14,8 @@ const (
dbNodes = "nodes"
dbServices = "services"
dbChecks = "checks"
dbMaxMapSize = 128 * 1024 * 1024 // 128MB maximum size
dbKVS = "kvs"
dbMaxMapSize = 512 * 1024 * 1024 // 512MB maximum size
)
// The StateStore is responsible for maintaining all the Consul
@ -31,6 +32,7 @@ type StateStore struct {
nodeTable *MDBTable
serviceTable *MDBTable
checkTable *MDBTable
kvsTable *MDBTable
tables MDBTables
watch map[*MDBTable]*NotifyGroup
queryTables map[string]MDBTables
@ -183,8 +185,31 @@ func (s *StateStore) initialize() error {
},
}
s.kvsTable = &MDBTable{
Name: dbKVS,
Indexes: map[string]*MDBIndex{
"id": &MDBIndex{
Unique: true,
Fields: []string{"Key"},
},
"id_prefix": &MDBIndex{
Virtual: true,
RealIndex: "id",
Fields: []string{"Key"},
IdxFunc: DefaultIndexPrefixFunc,
},
},
Decoder: func(buf []byte) interface{} {
out := new(structs.DirEntry)
if err := structs.Decode(buf, out); err != nil {
panic(err)
}
return out
},
}
// Store the set of tables
s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable}
s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable, s.kvsTable}
for _, table := range s.tables {
table.Env = s.env
table.Encoder = encoder
@ -206,6 +231,8 @@ func (s *StateStore) initialize() error {
"NodeChecks": MDBTables{s.checkTable},
"ServiceChecks": MDBTables{s.checkTable},
"CheckServiceNodes": MDBTables{s.nodeTable, s.serviceTable, s.checkTable},
"KVSGet": MDBTables{s.kvsTable},
"KVSList": MDBTables{s.kvsTable},
}
return nil
}
@ -686,6 +713,159 @@ func (s *StateStore) parseCheckServiceNodes(tx *MDBTxn, res []interface{}, err e
return nodes
}
// KVSSet is used to create or update a KV entry
func (s *StateStore) KVSSet(index uint64, d *structs.DirEntry) error {
// Start a new txn
tx, err := s.kvsTable.StartTxn(false, nil)
if err != nil {
return err
}
defer tx.Abort()
// Get the existing node
res, err := s.kvsTable.GetTxn(tx, "id", d.Key)
if err != nil {
return err
}
// Set the create and modify times
if len(res) == 0 {
d.CreateIndex = index
} else {
d.CreateIndex = res[0].(*structs.DirEntry).CreateIndex
}
d.ModifyIndex = index
if err := s.kvsTable.InsertTxn(tx, d); err != nil {
return err
}
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
defer s.watch[s.kvsTable].Notify()
return tx.Commit()
}
// KVSRestore is used to restore a DirEntry. It should only be used when
// doing a restore, otherwise KVSSet should be used.
func (s *StateStore) KVSRestore(d *structs.DirEntry) error {
// Start a new txn
tx, err := s.kvsTable.StartTxn(false, nil)
if err != nil {
return err
}
defer tx.Abort()
if err := s.kvsTable.InsertTxn(tx, d); err != nil {
return err
}
return tx.Commit()
}
// KVSGet is used to get a KV entry
func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) {
idx, res, err := s.kvsTable.Get("id", key)
var d *structs.DirEntry
if len(res) > 0 {
d = res[0].(*structs.DirEntry)
}
return idx, d, err
}
// KVSList is used to list all KV entries with a prefix
func (s *StateStore) KVSList(prefix string) (uint64, structs.DirEntries, error) {
idx, res, err := s.kvsTable.Get("id_prefix", prefix)
ents := make(structs.DirEntries, len(res))
for idx, r := range res {
ents[idx] = r.(*structs.DirEntry)
}
return idx, ents, err
}
// KVSDelete is used to delete a KVS entry
func (s *StateStore) KVSDelete(index uint64, key string) error {
return s.kvsDeleteWithIndex(index, "id", key)
}
// KVSDeleteTree is used to delete all keys with a given prefix
func (s *StateStore) KVSDeleteTree(index uint64, prefix string) error {
if prefix == "" {
return s.kvsDeleteWithIndex(index, "id")
}
return s.kvsDeleteWithIndex(index, "id_prefix", prefix)
}
// kvsDeleteWithIndex does a delete with either the id or id_prefix
func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts ...string) error {
// Start a new txn
tx, err := s.kvsTable.StartTxn(false, nil)
if err != nil {
return err
}
defer tx.Abort()
num, err := s.kvsTable.DeleteTxn(tx, tableIndex, parts...)
if err != nil {
return err
}
if num > 0 {
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
defer s.watch[s.kvsTable].Notify()
}
return tx.Commit()
}
// KVSCheckAndSet is used to perform an atomic check-and-set
func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, error) {
// Start a new txn
tx, err := s.kvsTable.StartTxn(false, nil)
if err != nil {
return false, err
}
defer tx.Abort()
// Get the existing node
res, err := s.kvsTable.GetTxn(tx, "id", d.Key)
if err != nil {
return false, err
}
// Get the existing node if any
var exist *structs.DirEntry
if len(res) > 0 {
exist = res[0].(*structs.DirEntry)
}
// Use the ModifyIndex as the constraint. A modify of time of 0
// means we are doing a set-if-not-exists, while any other value
// means we expect that modify time.
if d.ModifyIndex == 0 && exist != nil {
return false, nil
} else if d.ModifyIndex > 0 && (exist == nil || exist.ModifyIndex != d.ModifyIndex) {
return false, nil
}
// Set the create and modify times
if exist == nil {
d.CreateIndex = index
} else {
d.CreateIndex = exist.CreateIndex
}
d.ModifyIndex = index
if err := s.kvsTable.InsertTxn(tx, d); err != nil {
return false, err
}
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
return false, err
}
defer s.watch[s.kvsTable].Notify()
return true, tx.Commit()
}
// Snapshot is used to create a point in time snapshot
func (s *StateStore) Snapshot() (*StateSnapshot, error) {
// Begin a new txn on all tables
@ -742,3 +922,10 @@ func (s *StateSnapshot) NodeChecks(node string) structs.HealthChecks {
_, checks := s.store.parseHealthChecks(s.lastIndex, res, err)
return checks
}
// KVSDump is used to list all KV entries. It takes a channel and streams
// back *struct.DirEntry objects. This will block and should be invoked
// in a goroutine.
func (s *StateSnapshot) KVSDump(stream chan<- interface{}) error {
return s.store.kvsTable.StreamTxn(stream, s.tx, "id")
}

View File

@ -550,6 +550,16 @@ func TestStoreSnapshot(t *testing.T) {
t.Fatalf("err: %v")
}
// Add some KVS entries
d := &structs.DirEntry{Key: "/web/a", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(14, d); err != nil {
t.Fatalf("err: %v", err)
}
d = &structs.DirEntry{Key: "/web/b", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(15, d); err != nil {
t.Fatalf("err: %v", err)
}
// Take a snapshot
snap, err := store.Snapshot()
if err != nil {
@ -558,7 +568,7 @@ func TestStoreSnapshot(t *testing.T) {
defer snap.Close()
// Check the last nodes
if idx := snap.LastIndex(); idx != 13 {
if idx := snap.LastIndex(); idx != 15 {
t.Fatalf("bad: %v", idx)
}
@ -591,6 +601,28 @@ func TestStoreSnapshot(t *testing.T) {
t.Fatalf("bad: %v", checks[0])
}
// Check we have the entries
streamCh := make(chan interface{}, 64)
doneCh := make(chan struct{})
var ents []*structs.DirEntry
go func() {
for {
obj := <-streamCh
if obj == nil {
close(doneCh)
return
}
ents = append(ents, obj.(*structs.DirEntry))
}
}()
if err := snap.KVSDump(streamCh); err != nil {
t.Fatalf("err: %v", err)
}
<-doneCh
if len(ents) != 2 {
t.Fatalf("missing KVS entries!")
}
// Make some changes!
if err := store.EnsureService(14, "foo", &structs.NodeService{"db", "db", "slave", 8000}); err != nil {
t.Fatalf("err: %v", err)
@ -612,6 +644,10 @@ func TestStoreSnapshot(t *testing.T) {
t.Fatalf("err: %v")
}
if err := store.KVSDelete(18, "/web/a"); err != nil {
t.Fatalf("err: %v")
}
// Check snapshot has old values
nodes = snap.Nodes()
if len(nodes) != 2 {
@ -639,6 +675,28 @@ func TestStoreSnapshot(t *testing.T) {
if !reflect.DeepEqual(checks[0], check) {
t.Fatalf("bad: %v", checks[0])
}
// Check we have the entries
streamCh = make(chan interface{}, 64)
doneCh = make(chan struct{})
ents = nil
go func() {
for {
obj := <-streamCh
if obj == nil {
close(doneCh)
return
}
ents = append(ents, obj.(*structs.DirEntry))
}
}()
if err := snap.KVSDump(streamCh); err != nil {
t.Fatalf("err: %v", err)
}
<-doneCh
if len(ents) != 2 {
t.Fatalf("missing KVS entries!")
}
}
func TestEnsureCheck(t *testing.T) {
@ -933,3 +991,279 @@ func TestSS_Register_Deregister_Query(t *testing.T) {
t.Fatalf("Bad: %v", nodes)
}
}
func TestKVSSet_Get(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
// Should not exist
idx, d, err := store.KVSGet("/foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 0 {
t.Fatalf("bad: %v", idx)
}
if d != nil {
t.Fatalf("bad: %v", d)
}
// Create the entry
d = &structs.DirEntry{Key: "/foo", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1000, d); err != nil {
t.Fatalf("err: %v", err)
}
// Should exist exist
idx, d, err = store.KVSGet("/foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 1000 {
t.Fatalf("bad: %v", idx)
}
if d.CreateIndex != 1000 {
t.Fatalf("bad: %v", d)
}
if d.ModifyIndex != 1000 {
t.Fatalf("bad: %v", d)
}
if d.Key != "/foo" {
t.Fatalf("bad: %v", d)
}
if d.Flags != 42 {
t.Fatalf("bad: %v", d)
}
if string(d.Value) != "test" {
t.Fatalf("bad: %v", d)
}
// Update the entry
d = &structs.DirEntry{Key: "/foo", Flags: 43, Value: []byte("zip")}
if err := store.KVSSet(1010, d); err != nil {
t.Fatalf("err: %v", err)
}
// Should update
idx, d, err = store.KVSGet("/foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 1010 {
t.Fatalf("bad: %v", idx)
}
if d.CreateIndex != 1000 {
t.Fatalf("bad: %v", d)
}
if d.ModifyIndex != 1010 {
t.Fatalf("bad: %v", d)
}
if d.Key != "/foo" {
t.Fatalf("bad: %v", d)
}
if d.Flags != 43 {
t.Fatalf("bad: %v", d)
}
if string(d.Value) != "zip" {
t.Fatalf("bad: %v", d)
}
}
func TestKVSDelete(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
// Create the entry
d := &structs.DirEntry{Key: "/foo", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1000, d); err != nil {
t.Fatalf("err: %v", err)
}
// Delete the entry
if err := store.KVSDelete(1020, "/foo"); err != nil {
t.Fatalf("err: %v", err)
}
// Should not exist
idx, d, err := store.KVSGet("/foo")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 1020 {
t.Fatalf("bad: %v", idx)
}
if d != nil {
t.Fatalf("bad: %v", d)
}
}
func TestKVSCheckAndSet(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
// CAS should fail, no entry
d := &structs.DirEntry{
ModifyIndex: 100,
Key: "/foo",
Flags: 42,
Value: []byte("test"),
}
ok, err := store.KVSCheckAndSet(1000, d)
if err != nil {
t.Fatalf("err: %v", err)
}
if ok {
t.Fatalf("unexpected commit")
}
// Constrain on not-exist, should work
d.ModifyIndex = 0
ok, err = store.KVSCheckAndSet(1001, d)
if err != nil {
t.Fatalf("err: %v", err)
}
if !ok {
t.Fatalf("expected commit")
}
// Constrain on not-exist, should fail
d.ModifyIndex = 0
ok, err = store.KVSCheckAndSet(1002, d)
if err != nil {
t.Fatalf("err: %v", err)
}
if ok {
t.Fatalf("unexpected commit")
}
// Constrain on a wrong modify time
d.ModifyIndex = 1000
ok, err = store.KVSCheckAndSet(1003, d)
if err != nil {
t.Fatalf("err: %v", err)
}
if ok {
t.Fatalf("unexpected commit")
}
// Constrain on a correct modify time
d.ModifyIndex = 1001
ok, err = store.KVSCheckAndSet(1004, d)
if err != nil {
t.Fatalf("err: %v", err)
}
if !ok {
t.Fatalf("expected commit")
}
}
func TestKVS_List(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
// Should not exist
idx, ents, err := store.KVSList("/web")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 0 {
t.Fatalf("bad: %v", idx)
}
if len(ents) != 0 {
t.Fatalf("bad: %v", ents)
}
// Create the entries
d := &structs.DirEntry{Key: "/web/a", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1000, d); err != nil {
t.Fatalf("err: %v", err)
}
d = &structs.DirEntry{Key: "/web/b", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1001, d); err != nil {
t.Fatalf("err: %v", err)
}
d = &structs.DirEntry{Key: "/web/sub/c", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1002, d); err != nil {
t.Fatalf("err: %v", err)
}
// Should list
idx, ents, err = store.KVSList("/web")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 1002 {
t.Fatalf("bad: %v", idx)
}
if len(ents) != 3 {
t.Fatalf("bad: %v", ents)
}
if ents[0].Key != "/web/a" {
t.Fatalf("bad: %v", ents[0])
}
if ents[1].Key != "/web/b" {
t.Fatalf("bad: %v", ents[1])
}
if ents[2].Key != "/web/sub/c" {
t.Fatalf("bad: %v", ents[2])
}
}
func TestKVSDeleteTree(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
// Should not exist
err = store.KVSDeleteTree(1000, "/web")
if err != nil {
t.Fatalf("err: %v", err)
}
// Create the entries
d := &structs.DirEntry{Key: "/web/a", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1000, d); err != nil {
t.Fatalf("err: %v", err)
}
d = &structs.DirEntry{Key: "/web/b", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1001, d); err != nil {
t.Fatalf("err: %v", err)
}
d = &structs.DirEntry{Key: "/web/sub/c", Flags: 42, Value: []byte("test")}
if err := store.KVSSet(1002, d); err != nil {
t.Fatalf("err: %v", err)
}
// Nuke the web tree
err = store.KVSDeleteTree(1010, "/web")
if err != nil {
t.Fatalf("err: %v", err)
}
// Nothing should list
idx, ents, err := store.KVSList("/web")
if err != nil {
t.Fatalf("err: %v", err)
}
if idx != 1010 {
t.Fatalf("bad: %v", idx)
}
if len(ents) != 0 {
t.Fatalf("bad: %v", ents)
}
}

View File

@ -18,6 +18,7 @@ type MessageType uint8
const (
RegisterRequestType MessageType = iota
DeregisterRequestType
KVSRequestType
)
const (
@ -172,6 +173,45 @@ type IndexedCheckServiceNodes struct {
Nodes CheckServiceNodes
}
// DirEntry is used to represent a directory entry. This is
// used for values in our Key-Value store.
type DirEntry struct {
CreateIndex uint64
ModifyIndex uint64
Key string
Flags uint64
Value []byte
}
type DirEntries []*DirEntry
type KVSOp string
const (
KVSSet KVSOp = "set"
KVSDelete = "delete"
KVSDeleteTree = "delete-tree"
KVSCAS = "cas" // Check-and-set
)
// KVSRequest is used to operate on the Key-Value store
type KVSRequest struct {
Datacenter string
Op KVSOp // Which operation are we performing
DirEnt DirEntry // Which directory entry
}
// KeyRequest is used to request a key, or key prefix
type KeyRequest struct {
Datacenter string
Key string
BlockingQuery
}
type IndexedDirEntries struct {
Index uint64
Entries DirEntries
}
// Decode is used to decode a MsgPack encoded object
func Decode(buf []byte, out interface{}) error {
var handle codec.MsgpackHandle

View File

@ -7,11 +7,12 @@ sidebar_current: "docs-agent-http"
# HTTP API
The main interface to Consul is a RESTful HTTP API. The API can be
used for CRUD for nodes, services, and checks. The endpoints are
used for CRUD for nodes, services, checks, and configuration. The endpoints are
versioned to enable changes without breaking backwards compatibility.
All endpoints fall into one of 4 categories:
All endpoints fall into one of 5 categories:
* kv - Key/Value store
* agent - Agent control
* catalog - Manages nodes and services
* health - Manages health checks
@ -28,7 +29,7 @@ Queries that support this will mention it specifically, however the use of this
feature is the same for all. If supported, the query will set an HTTP header
"X-Consul-Index". This is an opaque handle that the client will use.
To cause a query to block, the query parameters "?wait=<interval>&index=<idx>" are added
To cause a query to block, the query parameters "?wait=\<interval\>&index=\<idx\>" are added
to a request. The "?wait=" query parameter limits how long the query will potentially
block for. It not set, it will default to 10 minutes. It can be specified in the form of
"10s" or "5m", which is 10 seconds or 5 minutes respectively. The "?index=" parameter is an
@ -41,6 +42,72 @@ note is that when the query returns there is **no guarantee** of a change. It is
possible that the timeout was reached, or that there was an idempotent write that
does not affect the result.
## KV
The KV endpoint is used to expose a simple key/value store. This can be used
to store service configurations or other meta data in a simple way. It has only
a single endpoint:
/v1/kv/<key>
This is the only endpoint that is used with the Key/Value store.
It's use depends on the HTTP method. The `GET`, `PUT` and `DELETE` methods
are all supported.
When using the `GET` method, Consul will return the specified key,
or if the "?recurse" query parameter is provided, it will return
all keys with the given prefix.
Each object will look like:
[
{
"CreateIndex":100,
"ModifyIndex":200,
"Key":"zip",
"Flags":0,
"Value":"dGVzdA=="
}
]
The `CreateIndex` is the internal index value that represents
when the entry was created. The `ModifyIndex` is the last index
that modified this key. This index corresponds to the `X-Consul-Index`
header value that is returned. A blocking query can be used to wait for
a value to change. If "?recurse" is used, the `X-Consul-Index` corresponds
to the latest `ModifyIndex` and so a blocking query waits until any of the
listed keys are updated.
The `Key` is simply the full path of the entry. `Flags` are an opaque
unsigned integer that can be attached to each entry. The use of this is
left totally to the user. Lastly, the `Value` is a base64 key value.
If no entries are found, a 404 code is returned.
When using the `PUT` method, Consul expects the request body to be the
value corresponding to the key. There are a number of parameters that can
be used with a PUT request:
* ?flags=\<num\> : This can be used to specify an unsigned value between
0 and 2^64-1. It is opaque to the user, but a client application may
use it.
* ?cas=\<index\> : This flag is used to turn the `PUT` into a **Check-And-Set**
operation. This is very useful as it allows clients to build more complex
syncronization primitives on top. If the index is 0, then Consul will only
put the key if it does not already exist. If the index is non-zero, then
the key is only set if the index matches the `ModifyIndex` of that key.
The return value is simply either `true` or `false`. If the CAS check fails,
then `false` will be returned.
Lastly, the `DELETE` method can be used to delete a single key or all
keys sharing a prefix. If the "?recurse" query parameter is provided,
then all keys with the prefix are deleted, otherwise only the specified
key.
## Agent
The Agent endpoints are used to interact with a local Consul agent. Usually,

View File

@ -35,30 +35,34 @@ Here is an example output:
num_peers = 2
state = Leader
term = 4
serf-lan:
event-queue = 0
event-time = 2
serf_lan:
event_queue = 0
event_time = 2
failed = 0
intent-queue = 0
intent_queue = 0
left = 0
member-time = 7
member_time = 7
members = 3
serf-wan:
event-queue = 0
event-time = 1
query_queue = 0
query_time = 1
serf_wan:
event_queue = 0
event_time = 1
failed = 0
intent-queue = 0
intent_queue = 0
left = 0
member-time = 1
member_time = 1
members = 1
query_queue = 0
query_time = 1
There are currently the top-level keys for:
* agent: Provides information about the agent
* consul: Information about the consul library (client or server)
* raft: Provides info about the Raft [consensus library](/docs/internals/consensus.html)
* serf-lan: Provides info about the LAN [gossip pool](/docs/internals/gossip.html)
* serf-wan: Provides info about the WAN [gossip pool](/docs/internals/gossip.html)
* serf_lan: Provides info about the LAN [gossip pool](/docs/internals/gossip.html)
* serf_wan: Provides info about the WAN [gossip pool](/docs/internals/gossip.html)
## Usage