diff --git a/command/agent/agent.go b/command/agent/agent.go index 417c4552e..14db1ea3d 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -549,22 +549,6 @@ func (a *Agent) WANMembers() []serf.Member { } } -// CanServersUnderstandProtocol checks to see if all the servers understand the -// given protocol version. -func (a *Agent) CanServersUnderstandProtocol(version uint8) bool { - numServers, numWhoGrok := 0, 0 - members := a.LANMembers() - for _, member := range members { - if member.Tags["role"] == "consul" { - numServers++ - if member.ProtocolMax >= version { - numWhoGrok++ - } - } - } - return (numServers > 0) && (numWhoGrok == numServers) -} - // StartSync is called once Services and Checks are registered. // This is called to prevent a race between clients and the anti-entropy routines func (a *Agent) StartSync() { @@ -603,13 +587,19 @@ func (a *Agent) sendCoordinate() { select { case <-time.After(intv): - if !a.CanServersUnderstandProtocol(3) { + members := a.LANMembers() + grok, err := consul.CanServersUnderstandProtocol(members, 3) + if err != nil { + a.logger.Printf("[ERR] agent: failed to check servers: %s", err) + continue + } + if !grok { + a.logger.Printf("[DEBUG] agent: skipping coordinate updates until servers are upgraded") continue } - var c *coordinate.Coordinate - var err error - if c, err = a.GetCoordinate(); err != nil { + c, err := a.GetCoordinate() + if err != nil { a.logger.Printf("[ERR] agent: failed to get coordinate: %s", err) continue } diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index cf1f6c0b6..24fad74af 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -1602,30 +1602,3 @@ func TestAgent_GetCoordinate(t *testing.T) { check(true) check(false) } - -func TestAgent_CanServersUnderstandProtocol(t *testing.T) { - config := nextConfig() - dir, agent := makeAgent(t, config) - defer os.RemoveAll(dir) - defer agent.Shutdown() - - min := uint8(consul.ProtocolVersionMin) - if !agent.CanServersUnderstandProtocol(min) { - t.Fatalf("should grok %d", min) - } - - max := uint8(consul.ProtocolVersionMax) - if !agent.CanServersUnderstandProtocol(max) { - t.Fatalf("should grok %d", max) - } - - current := uint8(config.Protocol) - if !agent.CanServersUnderstandProtocol(current) { - t.Fatalf("should grok %d", current) - } - - future := max + 1 - if agent.CanServersUnderstandProtocol(future) { - t.Fatalf("should not grok %d", future) - } -} diff --git a/consul/coordinate_endpoint.go b/consul/coordinate_endpoint.go index 4f429bead..db156df5d 100644 --- a/consul/coordinate_endpoint.go +++ b/consul/coordinate_endpoint.go @@ -87,8 +87,12 @@ func (c *Coordinate) batchApplyUpdates() error { end = size } + // We set the "safe to ignore" flag on this update type so old + // servers don't crash if they see one of these. + t := structs.CoordinateBatchUpdateType | structs.IgnoreUnknownTypeFlag + slice := updates[start:end] - if _, err := c.srv.raftApply(structs.CoordinateBatchUpdateType, slice); err != nil { + if _, err := c.srv.raftApply(t, slice); err != nil { return err } } diff --git a/consul/util.go b/consul/util.go index f5a29a49b..8dd51c091 100644 --- a/consul/util.go +++ b/consul/util.go @@ -95,6 +95,35 @@ func ensurePath(path string, dir bool) error { return os.MkdirAll(path, 0755) } +// CanServersUnderstandProtocol checks to see if all the servers in the given +// list understand the given protocol version. If there are no servers in the +// list then this will return false. +func CanServersUnderstandProtocol(members []serf.Member, version uint8) (bool, error) { + numServers, numWhoGrok := 0, 0 + for _, m := range members { + if m.Tags["role"] != "consul" { + continue + } + numServers++ + + vsn_min, err := strconv.Atoi(m.Tags["vsn_min"]) + if err != nil { + return false, err + } + + vsn_max, err := strconv.Atoi(m.Tags["vsn_max"]) + if err != nil { + return false, err + } + + v := int(version) + if (v >= vsn_min) && (v <= vsn_max) { + numWhoGrok++ + } + } + return (numServers > 0) && (numWhoGrok == numServers), nil +} + // Returns if a member is a consul server. Returns a bool, // the datacenter, and the rpc port func isConsulServer(m serf.Member) (bool, *serverParts) { diff --git a/consul/util_test.go b/consul/util_test.go index d2f08c397..88a2c0393 100644 --- a/consul/util_test.go +++ b/consul/util_test.go @@ -2,6 +2,7 @@ package consul import ( "errors" + "fmt" "net" "regexp" "testing" @@ -118,6 +119,103 @@ func TestIsPrivateIP(t *testing.T) { } } +func TestUtil_CanServersUnderstandProtocol(t *testing.T) { + var members []serf.Member + + // All empty list cases should return false. + for v := ProtocolVersionMin; v <= ProtocolVersionMax; v++ { + grok, err := CanServersUnderstandProtocol(members, v) + if err != nil { + t.Fatalf("err: %v", err) + } + if grok { + t.Fatalf("empty list should always return false") + } + } + + // Add a non-server member. + members = append(members, serf.Member{ + Tags: map[string]string{ + "vsn_min": fmt.Sprintf("%d", ProtocolVersionMin), + "vsn_max": fmt.Sprintf("%d", ProtocolVersionMax), + }, + }) + + // Make sure it doesn't get counted. + for v := ProtocolVersionMin; v <= ProtocolVersionMax; v++ { + grok, err := CanServersUnderstandProtocol(members, v) + if err != nil { + t.Fatalf("err: %v", err) + } + if grok { + t.Fatalf("non-server members should not be counted") + } + } + + // Add a server member. + members = append(members, serf.Member{ + Tags: map[string]string{ + "role": "consul", + "vsn_min": fmt.Sprintf("%d", ProtocolVersionMin), + "vsn_max": fmt.Sprintf("%d", ProtocolVersionMax), + }, + }) + + // Now it should report that it understands. + for v := ProtocolVersionMin; v <= ProtocolVersionMax; v++ { + grok, err := CanServersUnderstandProtocol(members, v) + if err != nil { + t.Fatalf("err: %v", err) + } + if !grok { + t.Fatalf("server should grok") + } + } + + // Nobody should understand anything from the future. + for v := uint8(ProtocolVersionMax + 1); v <= uint8(ProtocolVersionMax+10); v++ { + grok, err := CanServersUnderstandProtocol(members, v) + if err != nil { + t.Fatalf("err: %v", err) + } + if grok { + t.Fatalf("server should not grok") + } + } + + // Add an older server. + members = append(members, serf.Member{ + Tags: map[string]string{ + "role": "consul", + "vsn_min": fmt.Sprintf("%d", ProtocolVersionMin), + "vsn_max": fmt.Sprintf("%d", ProtocolVersionMax-1), + }, + }) + + // The servers should no longer understand the max version. + for v := ProtocolVersionMin; v <= ProtocolVersionMax; v++ { + grok, err := CanServersUnderstandProtocol(members, v) + if err != nil { + t.Fatalf("err: %v", err) + } + expected := v < ProtocolVersionMax + if grok != expected { + t.Fatalf("bad: %v != %v", grok, expected) + } + } + + // Try a version that's too low for the minimum. + { + grok, err := CanServersUnderstandProtocol(members, 0) + if err != nil { + t.Fatalf("err: %v", err) + } + if grok { + t.Fatalf("server should not grok") + } + } +} + func TestIsConsulServer(t *testing.T) { m := serf.Member{ Name: "foo",