diff --git a/consul/server.go b/consul/server.go index d873cd2b9..598e6bde3 100644 --- a/consul/server.go +++ b/consul/server.go @@ -108,6 +108,7 @@ type endpoints struct { Raft *Raft Status *Status KVS *KVS + Session *Session Internal *Internal } @@ -316,6 +317,7 @@ func (s *Server) setupRPC(tlsConfig *tls.Config) error { s.endpoints.Catalog = &Catalog{s} s.endpoints.Health = &Health{s} s.endpoints.KVS = &KVS{s} + s.endpoints.Session = &Session{s} s.endpoints.Internal = &Internal{s} // Register the handlers @@ -324,6 +326,7 @@ func (s *Server) setupRPC(tlsConfig *tls.Config) error { s.rpcServer.Register(s.endpoints.Catalog) s.rpcServer.Register(s.endpoints.Health) s.rpcServer.Register(s.endpoints.KVS) + s.rpcServer.Register(s.endpoints.Session) s.rpcServer.Register(s.endpoints.Internal) list, err := net.ListenTCP("tcp", s.config.RPCAddr) diff --git a/consul/session_endpoint.go b/consul/session_endpoint.go new file mode 100644 index 000000000..79938fc18 --- /dev/null +++ b/consul/session_endpoint.go @@ -0,0 +1,106 @@ +package consul + +import ( + "fmt" + "github.com/armon/go-metrics" + "github.com/hashicorp/consul/consul/structs" + "time" +) + +// Session endpoint is used to manipulate sessions for KV +type Session struct { + srv *Server +} + +// Apply is used to apply a modifying request to the data store. This should +// only be used for operations that modify the data +func (s *Session) Apply(args *structs.SessionRequest, reply *string) error { + if done, err := s.srv.forward("Session.Apply", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"consul", "session", "apply"}, time.Now()) + + // Verify the args + if args.Session.ID == "" && args.Op == structs.SessionDestroy { + return fmt.Errorf("Must provide ID") + } + if args.Session.Node == "" && args.Op == structs.SessionCreate { + return fmt.Errorf("Must provide Node") + } + + // Apply the update + resp, err := s.srv.raftApply(structs.SessionRequestType, args) + if err != nil { + s.srv.logger.Printf("[ERR] consul.session: Apply failed: %v", err) + return err + } + if respErr, ok := resp.(error); ok { + return respErr + } + + // Check if the return type is a string + if respString, ok := resp.(string); ok { + *reply = respString + } + return nil +} + +// Get is used to retrieve a single session +func (s *Session) Get(args *structs.SessionGetRequest, + reply *structs.IndexedSessions) error { + if done, err := s.srv.forward("Session.Get", args, args, reply); done { + return err + } + + // Get the local state + state := s.srv.fsm.State() + return s.srv.blockingRPC(&args.QueryOptions, + &reply.QueryMeta, + state.QueryTables("SessionGet"), + func() error { + index, session, err := state.SessionGet(args.Session) + reply.Index = index + if session != nil { + reply.Sessions = structs.Sessions{session} + } + return err + }) +} + +// List is used to list all the active sessions +func (s *Session) List(args *structs.DCSpecificRequest, + reply *structs.IndexedSessions) error { + if done, err := s.srv.forward("Session.List", args, args, reply); done { + return err + } + + // Get the local state + state := s.srv.fsm.State() + return s.srv.blockingRPC(&args.QueryOptions, + &reply.QueryMeta, + state.QueryTables("SessionList"), + func() error { + var err error + reply.Index, reply.Sessions, err = state.SessionList() + return err + }) +} + +// NodeSessions is used to get all the sessions for a particular node +func (s *Session) NodeSessions(args *structs.NodeSpecificRequest, + reply *structs.IndexedSessions) error { + if done, err := s.srv.forward("Session.NodeSessions", args, args, reply); done { + return err + } + + // Get the local state + state := s.srv.fsm.State() + return s.srv.blockingRPC(&args.QueryOptions, + &reply.QueryMeta, + state.QueryTables("NodeSessions"), + func() error { + var err error + reply.Index, reply.Sessions, err = state.NodeSessions(args.Node) + return err + }) +} diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 10c423b03..d2e5aba24 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -348,6 +348,7 @@ type Session struct { Checks []string CreateIndex uint64 } +type Sessions []*Session type SessionOp string @@ -368,6 +369,22 @@ func (r *SessionRequest) RequestDatacenter() string { return r.Datacenter } +// SessionGetRequest is used to request a session by ID +type SessionGetRequest struct { + Datacenter string + Session string + QueryOptions +} + +func (r *SessionGetRequest) RequestDatacenter() string { + return r.Datacenter +} + +type IndexedSessions struct { + Sessions Sessions + QueryMeta +} + // Decode is used to decode a MsgPack encoded object func Decode(buf []byte, out interface{}) error { var handle codec.MsgpackHandle