Adding FSM support for register/deregister health checks

This commit is contained in:
Armon Dadgar 2014-01-08 13:39:40 -08:00
parent 7c8993ef3f
commit ad346fecdf
5 changed files with 205 additions and 79 deletions

View file

@ -21,15 +21,26 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
return fmt.Errorf("Must provide node and address") return fmt.Errorf("Must provide node and address")
} }
if args.Service != nil {
// If no service id, but service name, use default // If no service id, but service name, use default
if args.ServiceID == "" && args.ServiceName != "" { if args.Service.ID == "" && args.Service.Service != "" {
args.ServiceID = args.ServiceName args.Service.ID = args.Service.Service
} }
// Verify ServiceName provided if ID // Verify ServiceName provided if ID
if args.ServiceID != "" && args.ServiceName == "" { if args.Service.ID != "" && args.Service.Service == "" {
return fmt.Errorf("Must provide service name with ID") return fmt.Errorf("Must provide service name with ID")
} }
}
if args.Check != nil {
if args.Check.CheckID == "" && args.Check.Name != "" {
args.Check.CheckID = args.Check.Name
}
if args.Check.Node == "" {
args.Check.Node = args.Node
}
}
_, err := c.srv.raftApply(structs.RegisterRequestType, args) _, err := c.srv.raftApply(structs.RegisterRequestType, args)
if err != nil { if err != nil {

View file

@ -21,9 +21,11 @@ func TestCatalogRegister(t *testing.T) {
Datacenter: "dc1", Datacenter: "dc1",
Node: "foo", Node: "foo",
Address: "127.0.0.1", Address: "127.0.0.1",
ServiceName: "db", Service: &structs.NodeService{
ServiceTag: "master", Service: "db",
ServicePort: 8000, Tag: "master",
Port: 8000,
},
} }
var out struct{} var out struct{}
@ -75,9 +77,11 @@ func TestCatalogRegister_ForwardLeader(t *testing.T) {
Datacenter: "dc1", Datacenter: "dc1",
Node: "foo", Node: "foo",
Address: "127.0.0.1", Address: "127.0.0.1",
ServiceName: "db", Service: &structs.NodeService{
ServiceTag: "master", Service: "db",
ServicePort: 8000, Tag: "master",
Port: 8000,
},
} }
var out struct{} var out struct{}
if err := client.Call("Catalog.Register", &arg, &out); err != nil { if err := client.Call("Catalog.Register", &arg, &out); err != nil {
@ -110,9 +114,11 @@ func TestCatalogRegister_ForwardDC(t *testing.T) {
Datacenter: "dc2", // SHould forward through s1 Datacenter: "dc2", // SHould forward through s1
Node: "foo", Node: "foo",
Address: "127.0.0.1", Address: "127.0.0.1",
ServiceName: "db", Service: &structs.NodeService{
ServiceTag: "master", Service: "db",
ServicePort: 8000, Tag: "master",
Port: 8000,
},
} }
var out struct{} var out struct{}
if err := client.Call("Catalog.Register", &arg, &out); err != nil { if err := client.Call("Catalog.Register", &arg, &out); err != nil {
@ -354,9 +360,11 @@ func TestCatalogRegister_FailedCase1(t *testing.T) {
Datacenter: "dc1", Datacenter: "dc1",
Node: "bar", Node: "bar",
Address: "127.0.0.2", Address: "127.0.0.2",
ServiceName: "web", Service: &structs.NodeService{
ServiceTag: "", Service: "web",
ServicePort: 8000, Tag: "",
Port: 8000,
},
} }
var out struct{} var out struct{}

View file

@ -46,7 +46,7 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
buf := log.Data buf := log.Data
switch structs.MessageType(buf[0]) { switch structs.MessageType(buf[0]) {
case structs.RegisterRequestType: case structs.RegisterRequestType:
return c.applyRegister(buf[1:]) return c.decodeRegister(buf[1:])
case structs.DeregisterRequestType: case structs.DeregisterRequestType:
return c.applyDeregister(buf[1:]) return c.applyDeregister(buf[1:])
default: default:
@ -54,21 +54,30 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
} }
} }
func (c *consulFSM) applyRegister(buf []byte) interface{} { func (c *consulFSM) decodeRegister(buf []byte) interface{} {
var req structs.RegisterRequest var req structs.RegisterRequest
if err := structs.Decode(buf, &req); err != nil { if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err)) panic(fmt.Errorf("failed to decode request: %v", err))
} }
return c.applyRegister(&req)
}
func (c *consulFSM) applyRegister(req *structs.RegisterRequest) interface{} {
// Ensure the node // Ensure the node
node := structs.Node{req.Node, req.Address} node := structs.Node{req.Node, req.Address}
c.state.EnsureNode(node) c.state.EnsureNode(node)
// Ensure the service if provided // Ensure the service if provided
if req.ServiceID != "" && req.ServiceName != "" { if req.Service != nil {
c.state.EnsureService(req.Node, req.ServiceID, req.ServiceName, c.state.EnsureService(req.Node, req.Service.ID, req.Service.Service,
req.ServiceTag, req.ServicePort) req.Service.Tag, req.Service.Port)
} }
// Ensure the check if provided
if req.Check != nil {
c.state.EnsureCheck(req.Check)
}
return nil return nil
} }
@ -81,6 +90,8 @@ func (c *consulFSM) applyDeregister(buf []byte) interface{} {
// Either remove the service entry or the whole node // Either remove the service entry or the whole node
if req.ServiceID != "" { if req.ServiceID != "" {
c.state.DeleteNodeService(req.Node, req.ServiceID) c.state.DeleteNodeService(req.Node, req.ServiceID)
} else if req.CheckID != "" {
c.state.DeleteNodeCheck(req.Node, req.CheckID)
} else { } else {
c.state.DeleteNode(req.Node) c.state.DeleteNode(req.Node)
} }
@ -108,6 +119,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
if err != nil { if err != nil {
return err return err
} }
c.state = state
// Create a decoder // Create a decoder
var handle codec.MsgpackHandle var handle codec.MsgpackHandle
@ -131,23 +143,13 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
if err := dec.Decode(&req); err != nil { if err := dec.Decode(&req); err != nil {
return err return err
} }
c.applyRegister(&req)
// Register the service or the node
if req.ServiceName != "" {
state.EnsureService(req.Node, req.ServiceID, req.ServiceName,
req.ServiceTag, req.ServicePort)
} else {
node := structs.Node{req.Node, req.Address}
state.EnsureNode(node)
}
default: default:
return fmt.Errorf("Unrecognized msg type: %v", msgType) return fmt.Errorf("Unrecognized msg type: %v", msgType)
} }
} }
// Do an atomic flip, safe since Apply is not called concurrently
c.state = state
return nil return nil
} }
@ -176,12 +178,20 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
// Register each service this node has // Register each service this node has
services := s.state.NodeServices(nodes[i].Node) services := s.state.NodeServices(nodes[i].Node)
for id, props := range services.Services { for _, srv := range services.Services {
req.ServiceID = id req.Service = srv
req.ServiceName = props.Service sink.Write([]byte{byte(structs.RegisterRequestType)})
req.ServiceTag = props.Tag if err := encoder.Encode(&req); err != nil {
req.ServicePort = props.Port sink.Cancel()
return err
}
}
// Register each check this node has
req.Service = nil
checks := s.state.NodeChecks(nodes[i].Node)
for _, check := range checks {
req.Check = check
sink.Write([]byte{byte(structs.RegisterRequestType)}) sink.Write([]byte{byte(structs.RegisterRequestType)})
if err := encoder.Encode(&req); err != nil { if err := encoder.Encode(&req); err != nil {
sink.Cancel() sink.Cancel()

View file

@ -77,10 +77,19 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
Datacenter: "dc1", Datacenter: "dc1",
Node: "foo", Node: "foo",
Address: "127.0.0.1", Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "db",
Service: "db",
Tag: "master",
Port: 8000,
},
Check: &structs.HealthCheck{
Node: "foo",
CheckID: "db",
Name: "db connectivity",
Status: structs.HealthPassing,
ServiceID: "db", ServiceID: "db",
ServiceName: "db", },
ServiceTag: "master",
ServicePort: 8000,
} }
buf, err := structs.Encode(structs.RegisterRequestType, req) buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil { if err != nil {
@ -102,6 +111,12 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
if _, ok := services.Services["db"]; !ok { if _, ok := services.Services["db"]; !ok {
t.Fatalf("not registered!") t.Fatalf("not registered!")
} }
// Verify check
checks := fsm.state.NodeChecks("foo")
if checks[0].CheckID != "db" {
t.Fatalf("not registered!")
}
} }
func TestFSM_DeregisterService(t *testing.T) { func TestFSM_DeregisterService(t *testing.T) {
@ -114,10 +129,12 @@ func TestFSM_DeregisterService(t *testing.T) {
Datacenter: "dc1", Datacenter: "dc1",
Node: "foo", Node: "foo",
Address: "127.0.0.1", Address: "127.0.0.1",
ServiceID: "db", Service: &structs.NodeService{
ServiceName: "db", ID: "db",
ServiceTag: "master", Service: "db",
ServicePort: 8000, Tag: "master",
Port: 8000,
},
} }
buf, err := structs.Encode(structs.RegisterRequestType, req) buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil { if err != nil {
@ -156,6 +173,60 @@ func TestFSM_DeregisterService(t *testing.T) {
} }
} }
func TestFSM_DeregisterCheck(t *testing.T) {
fsm, err := NewFSM()
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Check: &structs.HealthCheck{
Node: "foo",
CheckID: "mem",
Name: "memory util",
Status: structs.HealthPassing,
},
}
buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
dereg := structs.DeregisterRequest{
Datacenter: "dc1",
Node: "foo",
CheckID: "mem",
}
buf, err = structs.Encode(structs.DeregisterRequestType, dereg)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
if found, _ := fsm.state.GetNode("foo"); !found {
t.Fatalf("not found!")
}
// Verify check not registered
checks := fsm.state.NodeChecks("foo")
if len(checks) != 0 {
t.Fatalf("check registered!")
}
}
func TestFSM_DeregisterNode(t *testing.T) { func TestFSM_DeregisterNode(t *testing.T) {
fsm, err := NewFSM() fsm, err := NewFSM()
if err != nil { if err != nil {
@ -166,10 +237,19 @@ func TestFSM_DeregisterNode(t *testing.T) {
Datacenter: "dc1", Datacenter: "dc1",
Node: "foo", Node: "foo",
Address: "127.0.0.1", Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "db",
Service: "db",
Tag: "master",
Port: 8000,
},
Check: &structs.HealthCheck{
Node: "foo",
CheckID: "db",
Name: "db connectivity",
Status: structs.HealthPassing,
ServiceID: "db", ServiceID: "db",
ServiceName: "db", },
ServiceTag: "master",
ServicePort: 8000,
} }
buf, err := structs.Encode(structs.RegisterRequestType, req) buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil { if err != nil {
@ -205,6 +285,12 @@ func TestFSM_DeregisterNode(t *testing.T) {
if len(services.Services) != 0 { if len(services.Services) != 0 {
t.Fatalf("Services: %v", services) t.Fatalf("Services: %v", services)
} }
// Verify checks not registered
checks := fsm.state.NodeChecks("foo")
if len(checks) != 0 {
t.Fatalf("Services: %v", services)
}
} }
func TestFSM_SnapshotRestore(t *testing.T) { func TestFSM_SnapshotRestore(t *testing.T) {
@ -220,6 +306,13 @@ func TestFSM_SnapshotRestore(t *testing.T) {
fsm.state.EnsureService("foo", "db", "db", "primary", 5000) fsm.state.EnsureService("foo", "db", "db", "primary", 5000)
fsm.state.EnsureService("baz", "web", "web", "", 80) fsm.state.EnsureService("baz", "web", "web", "", 80)
fsm.state.EnsureService("baz", "db", "db", "secondary", 5000) fsm.state.EnsureService("baz", "db", "db", "secondary", 5000)
fsm.state.EnsureCheck(&structs.HealthCheck{
Node: "foo",
CheckID: "web",
Name: "web connectivity",
Status: structs.HealthPassing,
ServiceID: "web",
})
// Snapshot // Snapshot
snap, err := fsm.Snapshot() snap, err := fsm.Snapshot()
@ -262,4 +355,9 @@ func TestFSM_SnapshotRestore(t *testing.T) {
if fooSrv.Services["db"].Port != 5000 { if fooSrv.Services["db"].Port != 5000 {
t.Fatalf("Bad: %v", fooSrv) t.Fatalf("Bad: %v", fooSrv)
} }
checks := fsm2.state.NodeChecks("foo")
if len(checks) != 1 {
t.Fatalf("Bad: %v", checks)
}
} }

View file

@ -33,10 +33,8 @@ type RegisterRequest struct {
Datacenter string Datacenter string
Node string Node string
Address string Address string
ServiceID string Service *NodeService
ServiceName string Check *HealthCheck
ServiceTag string
ServicePort int
} }
// DeregisterRequest is used for the Catalog.Deregister endpoint // DeregisterRequest is used for the Catalog.Deregister endpoint
@ -46,6 +44,7 @@ type DeregisterRequest struct {
Datacenter string Datacenter string
Node string Node string
ServiceID string ServiceID string
CheckID string
} }
// Used to return information about a node // Used to return information about a node