From c1305a08ea4cba26463c8c3c4879efa5f8c15f35 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 27 Oct 2015 14:30:29 -0700 Subject: [PATCH 1/3] Makes the version upshift code look at the correct version field. --- command/agent/agent.go | 30 +++++-------- command/agent/agent_test.go | 27 ------------ consul/util.go | 24 +++++++++++ consul/util_test.go | 84 +++++++++++++++++++++++++++++++++++++ 4 files changed, 118 insertions(+), 47 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 417c4552e..14db1ea3d 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -549,22 +549,6 @@ func (a *Agent) WANMembers() []serf.Member { } } -// CanServersUnderstandProtocol checks to see if all the servers understand the -// given protocol version. -func (a *Agent) CanServersUnderstandProtocol(version uint8) bool { - numServers, numWhoGrok := 0, 0 - members := a.LANMembers() - for _, member := range members { - if member.Tags["role"] == "consul" { - numServers++ - if member.ProtocolMax >= version { - numWhoGrok++ - } - } - } - return (numServers > 0) && (numWhoGrok == numServers) -} - // StartSync is called once Services and Checks are registered. // This is called to prevent a race between clients and the anti-entropy routines func (a *Agent) StartSync() { @@ -603,13 +587,19 @@ func (a *Agent) sendCoordinate() { select { case <-time.After(intv): - if !a.CanServersUnderstandProtocol(3) { + members := a.LANMembers() + grok, err := consul.CanServersUnderstandProtocol(members, 3) + if err != nil { + a.logger.Printf("[ERR] agent: failed to check servers: %s", err) + continue + } + if !grok { + a.logger.Printf("[DEBUG] agent: skipping coordinate updates until servers are upgraded") continue } - var c *coordinate.Coordinate - var err error - if c, err = a.GetCoordinate(); err != nil { + c, err := a.GetCoordinate() + if err != nil { a.logger.Printf("[ERR] agent: failed to get coordinate: %s", err) continue } diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index cf1f6c0b6..24fad74af 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -1602,30 +1602,3 @@ func TestAgent_GetCoordinate(t *testing.T) { check(true) check(false) } - -func TestAgent_CanServersUnderstandProtocol(t *testing.T) { - config := nextConfig() - dir, agent := makeAgent(t, config) - defer os.RemoveAll(dir) - defer agent.Shutdown() - - min := uint8(consul.ProtocolVersionMin) - if !agent.CanServersUnderstandProtocol(min) { - t.Fatalf("should grok %d", min) - } - - max := uint8(consul.ProtocolVersionMax) - if !agent.CanServersUnderstandProtocol(max) { - t.Fatalf("should grok %d", max) - } - - current := uint8(config.Protocol) - if !agent.CanServersUnderstandProtocol(current) { - t.Fatalf("should grok %d", current) - } - - future := max + 1 - if agent.CanServersUnderstandProtocol(future) { - t.Fatalf("should not grok %d", future) - } -} diff --git a/consul/util.go b/consul/util.go index f5a29a49b..e59feeb2f 100644 --- a/consul/util.go +++ b/consul/util.go @@ -95,6 +95,30 @@ func ensurePath(path string, dir bool) error { return os.MkdirAll(path, 0755) } +// CanServersUnderstandProtocol checks to see if all the servers in the given +// list understand the given protocol version or higher. If there are no servers +// in the list then this will return false. +func CanServersUnderstandProtocol(members []serf.Member, version uint8) (bool, error) { + numServers, numWhoGrok := 0, 0 + for _, m := range members { + if m.Tags["role"] != "consul" { + continue + } + numServers++ + + vsn_str := m.Tags["vsn_max"] + vsn, err := strconv.Atoi(vsn_str) + if err != nil { + return false, err + } + + if vsn >= int(version) { + numWhoGrok++ + } + } + return (numServers > 0) && (numWhoGrok == numServers), nil +} + // Returns if a member is a consul server. Returns a bool, // the datacenter, and the rpc port func isConsulServer(m serf.Member) (bool, *serverParts) { diff --git a/consul/util_test.go b/consul/util_test.go index d2f08c397..8b577ee64 100644 --- a/consul/util_test.go +++ b/consul/util_test.go @@ -2,6 +2,7 @@ package consul import ( "errors" + "fmt" "net" "regexp" "testing" @@ -118,6 +119,89 @@ func TestIsPrivateIP(t *testing.T) { } } +func TestUtil_CanServersUnderstandProtocol(t *testing.T) { + var members []serf.Member + + // All empty list cases should return false. + for v := ProtocolVersionMin; v <= ProtocolVersionMax; v++ { + grok, err := CanServersUnderstandProtocol(members, v) + if err != nil { + t.Fatalf("err: %v", err) + } + if grok { + t.Fatalf("empty list should always return false") + } + } + + // Add a non-server member. + members = append(members, serf.Member{ + Tags: map[string]string{ + "vsn_max": fmt.Sprintf("%d", ProtocolVersionMax), + }, + }) + + // Make sure it doesn't get counted. + for v := ProtocolVersionMin; v <= ProtocolVersionMax; v++ { + grok, err := CanServersUnderstandProtocol(members, v) + if err != nil { + t.Fatalf("err: %v", err) + } + if grok { + t.Fatalf("non-server members should not be counted") + } + } + + // Add a server member. + members = append(members, serf.Member{ + Tags: map[string]string{ + "role": "consul", + "vsn_max": fmt.Sprintf("%d", ProtocolVersionMax), + }, + }) + + // Now it should report that it understands. + for v := ProtocolVersionMin; v <= ProtocolVersionMax; v++ { + grok, err := CanServersUnderstandProtocol(members, v) + if err != nil { + t.Fatalf("err: %v", err) + } + if !grok { + t.Fatalf("server should grok") + } + } + + // Nobody should understand anything from the future. + for v := uint8(ProtocolVersionMax + 1); v <= uint8(ProtocolVersionMax+10); v++ { + grok, err := CanServersUnderstandProtocol(members, v) + if err != nil { + t.Fatalf("err: %v", err) + } + if grok { + t.Fatalf("server should not grok") + } + } + + // Add an older server. + members = append(members, serf.Member{ + Tags: map[string]string{ + "role": "consul", + "vsn_max": fmt.Sprintf("%d", ProtocolVersionMax-1), + }, + }) + + // The servers should no longer understand the max version. + for v := ProtocolVersionMin; v <= ProtocolVersionMax; v++ { + grok, err := CanServersUnderstandProtocol(members, v) + if err != nil { + t.Fatalf("err: %v", err) + } + expected := v < ProtocolVersionMax + if grok != expected { + t.Fatalf("bad: %v != %v", grok, expected) + } + } +} + func TestIsConsulServer(t *testing.T) { m := serf.Member{ Name: "foo", From 1c678effde9b9ea684ab5705b4568d5c463db9c8 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 27 Oct 2015 14:41:33 -0700 Subject: [PATCH 2/3] Sets the ignore flag on coordinate update Raft log entries. --- consul/coordinate_endpoint.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/consul/coordinate_endpoint.go b/consul/coordinate_endpoint.go index 4f429bead..db156df5d 100644 --- a/consul/coordinate_endpoint.go +++ b/consul/coordinate_endpoint.go @@ -87,8 +87,12 @@ func (c *Coordinate) batchApplyUpdates() error { end = size } + // We set the "safe to ignore" flag on this update type so old + // servers don't crash if they see one of these. + t := structs.CoordinateBatchUpdateType | structs.IgnoreUnknownTypeFlag + slice := updates[start:end] - if _, err := c.srv.raftApply(structs.CoordinateBatchUpdateType, slice); err != nil { + if _, err := c.srv.raftApply(t, slice); err != nil { return err } } From 5449096bc21fbd3578632880adf8c746bc5ff926 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 27 Oct 2015 15:56:36 -0700 Subject: [PATCH 3/3] Adds a check for the minimum version as well. --- consul/util.go | 15 ++++++++++----- consul/util_test.go | 14 ++++++++++++++ 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/consul/util.go b/consul/util.go index e59feeb2f..8dd51c091 100644 --- a/consul/util.go +++ b/consul/util.go @@ -96,8 +96,8 @@ func ensurePath(path string, dir bool) error { } // CanServersUnderstandProtocol checks to see if all the servers in the given -// list understand the given protocol version or higher. If there are no servers -// in the list then this will return false. +// list understand the given protocol version. If there are no servers in the +// list then this will return false. func CanServersUnderstandProtocol(members []serf.Member, version uint8) (bool, error) { numServers, numWhoGrok := 0, 0 for _, m := range members { @@ -106,13 +106,18 @@ func CanServersUnderstandProtocol(members []serf.Member, version uint8) (bool, e } numServers++ - vsn_str := m.Tags["vsn_max"] - vsn, err := strconv.Atoi(vsn_str) + vsn_min, err := strconv.Atoi(m.Tags["vsn_min"]) if err != nil { return false, err } - if vsn >= int(version) { + vsn_max, err := strconv.Atoi(m.Tags["vsn_max"]) + if err != nil { + return false, err + } + + v := int(version) + if (v >= vsn_min) && (v <= vsn_max) { numWhoGrok++ } } diff --git a/consul/util_test.go b/consul/util_test.go index 8b577ee64..88a2c0393 100644 --- a/consul/util_test.go +++ b/consul/util_test.go @@ -136,6 +136,7 @@ func TestUtil_CanServersUnderstandProtocol(t *testing.T) { // Add a non-server member. members = append(members, serf.Member{ Tags: map[string]string{ + "vsn_min": fmt.Sprintf("%d", ProtocolVersionMin), "vsn_max": fmt.Sprintf("%d", ProtocolVersionMax), }, }) @@ -155,6 +156,7 @@ func TestUtil_CanServersUnderstandProtocol(t *testing.T) { members = append(members, serf.Member{ Tags: map[string]string{ "role": "consul", + "vsn_min": fmt.Sprintf("%d", ProtocolVersionMin), "vsn_max": fmt.Sprintf("%d", ProtocolVersionMax), }, }) @@ -185,6 +187,7 @@ func TestUtil_CanServersUnderstandProtocol(t *testing.T) { members = append(members, serf.Member{ Tags: map[string]string{ "role": "consul", + "vsn_min": fmt.Sprintf("%d", ProtocolVersionMin), "vsn_max": fmt.Sprintf("%d", ProtocolVersionMax-1), }, }) @@ -200,6 +203,17 @@ func TestUtil_CanServersUnderstandProtocol(t *testing.T) { t.Fatalf("bad: %v != %v", grok, expected) } } + + // Try a version that's too low for the minimum. + { + grok, err := CanServersUnderstandProtocol(members, 0) + if err != nil { + t.Fatalf("err: %v", err) + } + if grok { + t.Fatalf("server should not grok") + } + } } func TestIsConsulServer(t *testing.T) {