agent: First pass at KVS endpoints

This commit is contained in:
Armon Dadgar 2014-03-31 17:12:10 -07:00
parent 5def21491e
commit 5af036704d
3 changed files with 158 additions and 1 deletions

View file

@ -88,6 +88,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/agent/service/register", s.wrap(s.AgentRegisterService)) s.mux.HandleFunc("/v1/agent/service/register", s.wrap(s.AgentRegisterService))
s.mux.HandleFunc("/v1/agent/service/deregister", s.wrap(s.AgentDeregisterService)) s.mux.HandleFunc("/v1/agent/service/deregister", s.wrap(s.AgentDeregisterService))
s.mux.HandleFunc("/v1/kv/", s.wrap(s.KVSEndpoint))
if enableDebug { if enableDebug {
s.mux.HandleFunc("/debug/pprof/", pprof.Index) s.mux.HandleFunc("/debug/pprof/", pprof.Index)
s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)

View file

@ -8,6 +8,7 @@ register new services.
The URLs are also versioned to allow for changes in the API. The URLs are also versioned to allow for changes in the API.
The current URLs supported are: The current URLs supported are:
Catalog:
* /v1/catalog/register : Registers a new service * /v1/catalog/register : Registers a new service
* /v1/catalog/deregister : Deregisters a service or node * /v1/catalog/deregister : Deregisters a service or node
* /v1/catalog/datacenters : Lists known datacenters * /v1/catalog/datacenters : Lists known datacenters
@ -16,15 +17,17 @@ The current URLs supported are:
* /v1/catalog/service/<service>/ : Lists the nodes in a given service * /v1/catalog/service/<service>/ : Lists the nodes in a given service
* /v1/catalog/node/<node>/ : Lists the services provided by a node * /v1/catalog/node/<node>/ : Lists the services provided by a node
* Health system: Health system:
* /v1/health/node/<node>: Returns the health info of a node * /v1/health/node/<node>: Returns the health info of a node
* /v1/health/checks/<service>: Returns the checks of a service * /v1/health/checks/<service>: Returns the checks of a service
* /v1/health/service/<service>: Returns the nodes and health info of a service * /v1/health/service/<service>: Returns the nodes and health info of a service
* /v1/health/state/<state>: Returns the checks in a given state * /v1/health/state/<state>: Returns the checks in a given state
Status:
* /v1/status/leader : Returns the current Raft leader * /v1/status/leader : Returns the current Raft leader
* /v1/status/peers : Returns the current Raft peer set * /v1/status/peers : Returns the current Raft peer set
Agent:
* /v1/agent/checks: Returns the checks the local agent is managing * /v1/agent/checks: Returns the checks the local agent is managing
* /v1/agent/services : Returns the services local agent is managing * /v1/agent/services : Returns the services local agent is managing
* /v1/agent/members : Returns the members as seen by the local serf agent * /v1/agent/members : Returns the members as seen by the local serf agent
@ -37,3 +40,7 @@ The current URLs supported are:
* /v1/agent/check/fail/<name> * /v1/agent/check/fail/<name>
* /v1/agent/service/register * /v1/agent/service/register
* /v1/agent/service/deregister/<name> * /v1/agent/service/deregister/<name>
KVS:
* /v1/kv/<key>

View file

@ -0,0 +1,148 @@
package agent
import (
"bytes"
"github.com/hashicorp/consul/consul/structs"
"io"
"net/http"
"strconv"
"strings"
)
func (s *HTTPServer) KVSEndpoint(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Set default DC
args := structs.KeyRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.BlockingQuery); done {
return nil, nil
}
// Pull out the key name, validation left to each sub-handler
args.Key = strings.TrimPrefix(req.URL.Path, "/v1/kv/")
// Switch on the method
switch req.Method {
case "GET":
return s.KVSGet(resp, req, &args)
case "PUT":
return s.KVSPut(resp, req, &args)
case "DELETE":
return s.KVSDelete(resp, req, &args)
default:
resp.WriteHeader(405)
return nil, nil
}
return nil, nil
}
// KVSGet handles a GET request
func (s *HTTPServer) KVSGet(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) {
// Check for recurse
method := "KVS.Get"
params := req.URL.Query()
if _, ok := params["recurse"]; ok {
method = "KVS.List"
} else if missingKey(resp, args) {
return nil, nil
}
// Make the RPC
var out structs.IndexedDirEntries
if err := s.agent.RPC(method, &args, &out); err != nil {
return nil, err
}
setIndex(resp, out.Index)
return out.Entries, nil
}
// KVSPut handles a PUT request
func (s *HTTPServer) KVSPut(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) {
if missingKey(resp, args) {
return nil, nil
}
applyReq := structs.KVSRequest{
Datacenter: args.Datacenter,
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: args.Key,
Flags: 0,
Value: nil,
},
}
// Check for flags
params := req.URL.Query()
if _, ok := params["flags"]; ok {
flagVal, err := strconv.ParseUint(params.Get("flags"), 10, 64)
if err != nil {
return nil, err
}
applyReq.DirEnt.Flags = flagVal
}
// Check for cas value
if _, ok := params["cas"]; ok {
casVal, err := strconv.ParseUint(params.Get("flags"), 10, 64)
if err != nil {
return nil, err
}
applyReq.DirEnt.ModifyIndex = casVal
applyReq.Op = structs.KVSCAS
}
// Copy the value
buf := bytes.NewBuffer(nil)
if _, err := io.Copy(buf, req.Body); err != nil {
return nil, err
}
applyReq.DirEnt.Value = buf.Bytes()
// Make the RPC
var out bool
if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil {
return nil, err
}
// Only use the out value if this was a CAS
if applyReq.Op == structs.KVSSet {
return true, nil
} else {
return out, nil
}
}
// KVSPut handles a DELETE request
func (s *HTTPServer) KVSDelete(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) {
if missingKey(resp, args) {
return nil, nil
}
applyReq := structs.KVSRequest{
Datacenter: args.Datacenter,
Op: structs.KVSDelete,
DirEnt: structs.DirEntry{
Key: args.Key,
},
}
// Check for recurse
params := req.URL.Query()
if _, ok := params["recurse"]; ok {
applyReq.Op = structs.KVSDeleteTree
}
// Make the RPC
var out bool
if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil {
return nil, err
}
return nil, nil
}
// missingKey checks if the key is missing
func missingKey(resp http.ResponseWriter, args *structs.KeyRequest) bool {
if args.Key == "" {
resp.WriteHeader(400)
resp.Write([]byte("Missing key name"))
return true
}
return false
}