From ad346fecdfc437ebb6b933da6d84f41bf314d326 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 8 Jan 2014 13:39:40 -0800 Subject: [PATCH] Adding FSM support for register/deregister health checks --- consul/catalog_endpoint.go | 23 ++++-- consul/catalog_endpoint_test.go | 56 +++++++------ consul/fsm.go | 52 +++++++----- consul/fsm_test.go | 140 +++++++++++++++++++++++++++----- consul/structs/structs.go | 13 ++- 5 files changed, 205 insertions(+), 79 deletions(-) diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 27a7a8475..29ca221fb 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -21,14 +21,25 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error return fmt.Errorf("Must provide node and address") } - // If no service id, but service name, use default - if args.ServiceID == "" && args.ServiceName != "" { - args.ServiceID = args.ServiceName + if args.Service != nil { + // If no service id, but service name, use default + if args.Service.ID == "" && args.Service.Service != "" { + args.Service.ID = args.Service.Service + } + + // Verify ServiceName provided if ID + if args.Service.ID != "" && args.Service.Service == "" { + return fmt.Errorf("Must provide service name with ID") + } } - // Verify ServiceName provided if ID - if args.ServiceID != "" && args.ServiceName == "" { - return fmt.Errorf("Must provide service name with ID") + if args.Check != nil { + if args.Check.CheckID == "" && args.Check.Name != "" { + args.Check.CheckID = args.Check.Name + } + if args.Check.Node == "" { + args.Check.Node = args.Node + } } _, err := c.srv.raftApply(structs.RegisterRequestType, args) diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index 485b500a4..d9cd7c59a 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -18,12 +18,14 @@ func TestCatalogRegister(t *testing.T) { defer client.Close() arg := structs.RegisterRequest{ - Datacenter: "dc1", - Node: "foo", - Address: "127.0.0.1", - ServiceName: "db", - ServiceTag: "master", - ServicePort: 8000, + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + Service: "db", + Tag: "master", + Port: 8000, + }, } var out struct{} @@ -72,12 +74,14 @@ func TestCatalogRegister_ForwardLeader(t *testing.T) { } arg := structs.RegisterRequest{ - Datacenter: "dc1", - Node: "foo", - Address: "127.0.0.1", - ServiceName: "db", - ServiceTag: "master", - ServicePort: 8000, + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + Service: "db", + Tag: "master", + Port: 8000, + }, } var out struct{} if err := client.Call("Catalog.Register", &arg, &out); err != nil { @@ -107,12 +111,14 @@ func TestCatalogRegister_ForwardDC(t *testing.T) { time.Sleep(100 * time.Millisecond) arg := structs.RegisterRequest{ - Datacenter: "dc2", // SHould forward through s1 - Node: "foo", - Address: "127.0.0.1", - ServiceName: "db", - ServiceTag: "master", - ServicePort: 8000, + Datacenter: "dc2", // SHould forward through s1 + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + Service: "db", + Tag: "master", + Port: 8000, + }, } var out struct{} if err := client.Call("Catalog.Register", &arg, &out); err != nil { @@ -351,12 +357,14 @@ func TestCatalogRegister_FailedCase1(t *testing.T) { defer client.Close() arg := structs.RegisterRequest{ - Datacenter: "dc1", - Node: "bar", - Address: "127.0.0.2", - ServiceName: "web", - ServiceTag: "", - ServicePort: 8000, + Datacenter: "dc1", + Node: "bar", + Address: "127.0.0.2", + Service: &structs.NodeService{ + Service: "web", + Tag: "", + Port: 8000, + }, } var out struct{} diff --git a/consul/fsm.go b/consul/fsm.go index 8ab1fedff..fba8b1511 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -46,7 +46,7 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} { buf := log.Data switch structs.MessageType(buf[0]) { case structs.RegisterRequestType: - return c.applyRegister(buf[1:]) + return c.decodeRegister(buf[1:]) case structs.DeregisterRequestType: return c.applyDeregister(buf[1:]) default: @@ -54,21 +54,30 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} { } } -func (c *consulFSM) applyRegister(buf []byte) interface{} { +func (c *consulFSM) decodeRegister(buf []byte) interface{} { var req structs.RegisterRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } + return c.applyRegister(&req) +} +func (c *consulFSM) applyRegister(req *structs.RegisterRequest) interface{} { // Ensure the node node := structs.Node{req.Node, req.Address} c.state.EnsureNode(node) // Ensure the service if provided - if req.ServiceID != "" && req.ServiceName != "" { - c.state.EnsureService(req.Node, req.ServiceID, req.ServiceName, - req.ServiceTag, req.ServicePort) + if req.Service != nil { + c.state.EnsureService(req.Node, req.Service.ID, req.Service.Service, + req.Service.Tag, req.Service.Port) } + + // Ensure the check if provided + if req.Check != nil { + c.state.EnsureCheck(req.Check) + } + return nil } @@ -81,6 +90,8 @@ func (c *consulFSM) applyDeregister(buf []byte) interface{} { // Either remove the service entry or the whole node if req.ServiceID != "" { c.state.DeleteNodeService(req.Node, req.ServiceID) + } else if req.CheckID != "" { + c.state.DeleteNodeCheck(req.Node, req.CheckID) } else { c.state.DeleteNode(req.Node) } @@ -108,6 +119,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { if err != nil { return err } + c.state = state // Create a decoder var handle codec.MsgpackHandle @@ -131,23 +143,13 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { if err := dec.Decode(&req); err != nil { return err } - - // Register the service or the node - if req.ServiceName != "" { - state.EnsureService(req.Node, req.ServiceID, req.ServiceName, - req.ServiceTag, req.ServicePort) - } else { - node := structs.Node{req.Node, req.Address} - state.EnsureNode(node) - } + c.applyRegister(&req) default: return fmt.Errorf("Unrecognized msg type: %v", msgType) } } - // Do an atomic flip, safe since Apply is not called concurrently - c.state = state return nil } @@ -176,12 +178,20 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { // Register each service this node has services := s.state.NodeServices(nodes[i].Node) - for id, props := range services.Services { - req.ServiceID = id - req.ServiceName = props.Service - req.ServiceTag = props.Tag - req.ServicePort = props.Port + for _, srv := range services.Services { + req.Service = srv + sink.Write([]byte{byte(structs.RegisterRequestType)}) + if err := encoder.Encode(&req); err != nil { + sink.Cancel() + return err + } + } + // Register each check this node has + req.Service = nil + checks := s.state.NodeChecks(nodes[i].Node) + for _, check := range checks { + req.Check = check sink.Write([]byte{byte(structs.RegisterRequestType)}) if err := encoder.Encode(&req); err != nil { sink.Cancel() diff --git a/consul/fsm_test.go b/consul/fsm_test.go index 87cd782dc..ca3b5195a 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -74,13 +74,22 @@ func TestFSM_RegisterNode_Service(t *testing.T) { } req := structs.RegisterRequest{ - Datacenter: "dc1", - Node: "foo", - Address: "127.0.0.1", - ServiceID: "db", - ServiceName: "db", - ServiceTag: "master", - ServicePort: 8000, + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: "db", + Service: "db", + Tag: "master", + Port: 8000, + }, + Check: &structs.HealthCheck{ + Node: "foo", + CheckID: "db", + Name: "db connectivity", + Status: structs.HealthPassing, + ServiceID: "db", + }, } buf, err := structs.Encode(structs.RegisterRequestType, req) if err != nil { @@ -102,6 +111,12 @@ func TestFSM_RegisterNode_Service(t *testing.T) { if _, ok := services.Services["db"]; !ok { t.Fatalf("not registered!") } + + // Verify check + checks := fsm.state.NodeChecks("foo") + if checks[0].CheckID != "db" { + t.Fatalf("not registered!") + } } func TestFSM_DeregisterService(t *testing.T) { @@ -111,13 +126,15 @@ func TestFSM_DeregisterService(t *testing.T) { } req := structs.RegisterRequest{ - Datacenter: "dc1", - Node: "foo", - Address: "127.0.0.1", - ServiceID: "db", - ServiceName: "db", - ServiceTag: "master", - ServicePort: 8000, + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: "db", + Service: "db", + Tag: "master", + Port: 8000, + }, } buf, err := structs.Encode(structs.RegisterRequestType, req) if err != nil { @@ -156,6 +173,60 @@ func TestFSM_DeregisterService(t *testing.T) { } } +func TestFSM_DeregisterCheck(t *testing.T) { + fsm, err := NewFSM() + if err != nil { + t.Fatalf("err: %v", err) + } + + req := structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Check: &structs.HealthCheck{ + Node: "foo", + CheckID: "mem", + Name: "memory util", + Status: structs.HealthPassing, + }, + } + buf, err := structs.Encode(structs.RegisterRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + dereg := structs.DeregisterRequest{ + Datacenter: "dc1", + Node: "foo", + CheckID: "mem", + } + buf, err = structs.Encode(structs.DeregisterRequestType, dereg) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp = fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + + // Verify we are registered + if found, _ := fsm.state.GetNode("foo"); !found { + t.Fatalf("not found!") + } + + // Verify check not registered + checks := fsm.state.NodeChecks("foo") + if len(checks) != 0 { + t.Fatalf("check registered!") + } +} + func TestFSM_DeregisterNode(t *testing.T) { fsm, err := NewFSM() if err != nil { @@ -163,13 +234,22 @@ func TestFSM_DeregisterNode(t *testing.T) { } req := structs.RegisterRequest{ - Datacenter: "dc1", - Node: "foo", - Address: "127.0.0.1", - ServiceID: "db", - ServiceName: "db", - ServiceTag: "master", - ServicePort: 8000, + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: "db", + Service: "db", + Tag: "master", + Port: 8000, + }, + Check: &structs.HealthCheck{ + Node: "foo", + CheckID: "db", + Name: "db connectivity", + Status: structs.HealthPassing, + ServiceID: "db", + }, } buf, err := structs.Encode(structs.RegisterRequestType, req) if err != nil { @@ -205,6 +285,12 @@ func TestFSM_DeregisterNode(t *testing.T) { if len(services.Services) != 0 { t.Fatalf("Services: %v", services) } + + // Verify checks not registered + checks := fsm.state.NodeChecks("foo") + if len(checks) != 0 { + t.Fatalf("Services: %v", services) + } } func TestFSM_SnapshotRestore(t *testing.T) { @@ -220,6 +306,13 @@ func TestFSM_SnapshotRestore(t *testing.T) { fsm.state.EnsureService("foo", "db", "db", "primary", 5000) fsm.state.EnsureService("baz", "web", "web", "", 80) fsm.state.EnsureService("baz", "db", "db", "secondary", 5000) + fsm.state.EnsureCheck(&structs.HealthCheck{ + Node: "foo", + CheckID: "web", + Name: "web connectivity", + Status: structs.HealthPassing, + ServiceID: "web", + }) // Snapshot snap, err := fsm.Snapshot() @@ -262,4 +355,9 @@ func TestFSM_SnapshotRestore(t *testing.T) { if fooSrv.Services["db"].Port != 5000 { t.Fatalf("Bad: %v", fooSrv) } + + checks := fsm2.state.NodeChecks("foo") + if len(checks) != 1 { + t.Fatalf("Bad: %v", checks) + } } diff --git a/consul/structs/structs.go b/consul/structs/structs.go index d87f9acd8..f954e99af 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -30,13 +30,11 @@ const ( // to register a node as providing a service. If no service // is provided, the node is registered. type RegisterRequest struct { - Datacenter string - Node string - Address string - ServiceID string - ServiceName string - ServiceTag string - ServicePort int + Datacenter string + Node string + Address string + Service *NodeService + Check *HealthCheck } // DeregisterRequest is used for the Catalog.Deregister endpoint @@ -46,6 +44,7 @@ type DeregisterRequest struct { Datacenter string Node string ServiceID string + CheckID string } // Used to return information about a node