From 32c2d1b2171b1deb3cb20c7b26a76b7f133d0ac1 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Mon, 28 Aug 2017 14:17:16 +0200 Subject: [PATCH] local state: fix anti-entropy state tests The anti-entropy tests relied on the side-effect of the StartSync() method to perform a full sync instead of a partial sync. This lead to multiple anti-entropy go routines being started unnecessary retry loops. This change changes the behavior to perform synchronous full syncs when necessary removing the need for all of the time.Sleep and most of the retry loops. --- agent/ae/ae.go | 16 +++--- agent/local/state.go | 22 ++++++-- agent/local/state_test.go | 116 +++++++++++++++++--------------------- agent/testagent.go | 12 ++-- 4 files changed, 82 insertions(+), 84 deletions(-) diff --git a/agent/ae/ae.go b/agent/ae/ae.go index f3b674503..b055f7aa1 100644 --- a/agent/ae/ae.go +++ b/agent/ae/ae.go @@ -51,8 +51,8 @@ type StateSyncer struct { // State contains the data that needs to be synchronized. State interface { - UpdateSyncState() error SyncChanges() error + SyncFull() error } // Interval is the time between two regular sync runs. @@ -91,15 +91,15 @@ func (s *StateSyncer) Run() { return lib.RandomStagger(time.Duration(f) * d) } -Sync: +FullSync: for { - switch err := s.State.UpdateSyncState(); { + switch err := s.State.SyncFull(); { - // update sync status failed + // full sync failed case err != nil: s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) - // retry updating sync status after some time or when a consul + // retry full sync after some time or when a consul // server was added. select { @@ -121,10 +121,8 @@ Sync: return } - // update sync status OK + // full sync OK default: - // force-trigger sync to pickup any changes - s.triggerSync() // do partial syncs until it is time for a full sync again for { @@ -140,7 +138,7 @@ Sync: // } case <-time.After(s.Interval + stagger(s.Interval)): - continue Sync + continue FullSync case <-s.TriggerCh: if s.Paused() { diff --git a/agent/local/state.go b/agent/local/state.go index 5058eed0b..b980c569f 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -538,9 +538,9 @@ func (l *State) Metadata() map[string]string { return m } -// UpdateSyncState does a read of the server state, and updates +// updateSyncState does a read of the server state, and updates // the local sync status as appropriate -func (l *State) UpdateSyncState() error { +func (l *State) updateSyncState() error { // 1. get all checks and services from the master req := structs.NodeSpecificRequest{ Datacenter: l.config.Datacenter, @@ -631,7 +631,6 @@ func (l *State) UpdateSyncState() error { } for id, rc := range remoteChecks { - lc := l.checks[id] // If we don't have the check locally, deregister it @@ -639,7 +638,7 @@ func (l *State) UpdateSyncState() error { // The Serf check is created automatically and does not // need to be deregistered. if id == structs.SerfCheckID { - l.logger.Printf("Skipping remote check %q since it is managed automatically", id) + l.logger.Printf("[DEBUG] Skipping remote check %q since it is managed automatically", id) continue } @@ -683,6 +682,21 @@ func (l *State) UpdateSyncState() error { return nil } +// SyncFull determines the delta between the local and remote state +// and synchronizes the changes. +func (l *State) SyncFull() error { + // note that we do not acquire the lock here since the methods + // we are calling will do that themself. + + // todo(fs): is it an issue that we do not hold the lock for the entire time? + // todo(fs): IMO, this doesn't matter since SyncChanges will sync whatever + // todo(fs): was determined in the update step. + if err := l.updateSyncState(); err != nil { + return err + } + return l.SyncChanges() +} + // SyncChanges is used to scan the status our local services and checks // and update any that are out of sync with the server func (l *State) SyncChanges() error { diff --git a/agent/local/state_test.go b/agent/local/state_test.go index 89136e88f..5646462d6 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -7,8 +7,8 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent" + "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" @@ -20,7 +20,7 @@ import ( func TestAgentAntiEntropy_Services(t *testing.T) { t.Parallel() - a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true} + a := &agent.TestAgent{Name: t.Name()} a.Start() defer a.Shutdown() @@ -113,8 +113,9 @@ func TestAgentAntiEntropy_Services(t *testing.T) { InSync: true, }) - // Trigger anti-entropy run and wait - a.StartSync() + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } var services structs.IndexedNodeServices req := structs.NodeSpecificRequest{ @@ -180,8 +181,9 @@ func TestAgentAntiEntropy_Services(t *testing.T) { // Remove one of the services a.State.RemoveService("api") - // Trigger anti-entropy run and wait - a.StartSync() + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } retry.Run(t, func(r *retry.R) { if err := a.RPC("Catalog.NodeServices", &req, &services); err != nil { @@ -228,7 +230,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) { func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { t.Parallel() - a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true} + a := &agent.TestAgent{Name: t.Name()} a.Start() defer a.Shutdown() @@ -275,8 +277,9 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) { t.Fatalf("err: %v", err) } - // Trigger anti-entropy run and wait - a.StartSync() + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } req := structs.NodeSpecificRequest{ Datacenter: "dc1", @@ -348,18 +351,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { } a.State.AddCheck(chk, "") - // todo(fs): data race - // func() { - // a.State.RLock() - // defer a.State.RUnlock() - - // // Sync the service once - // if err := a.State.syncService("mysql"); err != nil { - // t.Fatalf("err: %s", err) - // } - // }() - // todo(fs): is this correct? - if err := a.State.SyncChanges(); err != nil { + if err := a.State.SyncFull(); err != nil { t.Fatal("sync failed: ", err) } @@ -418,18 +410,7 @@ func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) { } a.State.AddCheck(chk2, "") - // todo(fs): data race - // func() { - // a.State.RLock() - // defer a.State.RUnlock() - - // // Sync the service once - // if err := a.State.syncService("redis"); err != nil { - // t.Fatalf("err: %s", err) - // } - // }() - // todo(fs): is this correct? - if err := a.State.SyncChanges(); err != nil { + if err := a.State.SyncFull(); err != nil { t.Fatal("sync failed: ", err) } @@ -522,9 +503,9 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { } a.State.AddService(srv2, token) - // Trigger anti-entropy run and wait - a.StartSync() - time.Sleep(200 * time.Millisecond) + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } // Verify that we are in sync { @@ -569,8 +550,9 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { // Now remove the service and re-sync a.State.RemoveService("api") - a.StartSync() - time.Sleep(200 * time.Millisecond) + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } // Verify that we are in sync { @@ -619,7 +601,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) { func TestAgentAntiEntropy_Checks(t *testing.T) { t.Parallel() - a := &agent.TestAgent{Name: t.Name(), NoInitialSync: true} + a := &agent.TestAgent{Name: t.Name()} a.Start() defer a.Shutdown() @@ -694,8 +676,9 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { InSync: true, }) - // Trigger anti-entropy run and wait - a.StartSync() + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } req := structs.NodeSpecificRequest{ Datacenter: "dc1", @@ -769,8 +752,9 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { // Remove one of the checks a.State.RemoveCheck("redis") - // Trigger anti-entropy run and wait - a.StartSync() + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } // Verify that we are in sync retry.Run(t, func(r *retry.R) { @@ -857,9 +841,9 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { } a.State.AddService(srv2, "root") - // Trigger anti-entropy run and wait - a.StartSync() - time.Sleep(200 * time.Millisecond) + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } // Verify that we are in sync { @@ -928,9 +912,9 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { } a.State.AddCheck(chk2, token) - // Trigger anti-entropy run and wait. - a.StartSync() - time.Sleep(200 * time.Millisecond) + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } // Verify that we are in sync retry.Run(t, func(r *retry.R) { @@ -975,8 +959,10 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { // Now delete the check and wait for sync. a.State.RemoveCheck("api-check") - a.StartSync() - time.Sleep(200 * time.Millisecond) + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } + // Verify that we are in sync retry.Run(t, func(r *retry.R) { req := structs.NodeSpecificRequest{ @@ -1090,8 +1076,9 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { } a.State.AddCheck(check, "") - // Trigger anti-entropy run and wait - a.StartSync() + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } // Verify that we are in sync req := structs.NodeSpecificRequest{ @@ -1172,9 +1159,9 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { } } - // Trigger anti-entropy run and wait. - a.StartSync() - time.Sleep(200 * time.Millisecond) + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } // Verify that the output was synced back to the agent's value. if err := a.RPC("Health.NodeChecks", &req, &checks); err != nil { @@ -1210,9 +1197,9 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { // Now make an update that should be deferred. a.State.UpdateCheck("web", api.HealthPassing, "deferred") - // Trigger anti-entropy run and wait. - a.StartSync() - time.Sleep(200 * time.Millisecond) + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } // Verify that the output is still out of sync since there's a deferred // update pending. @@ -1272,8 +1259,9 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) { t.Fatalf("err: %v", err) } - // Trigger anti-entropy run and wait - a.StartSync() + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } req := structs.NodeSpecificRequest{ Datacenter: "dc1", @@ -1304,8 +1292,10 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) { t.Fatalf("err: %v", err) } - // Trigger anti-entropy run and wait - a.StartSync() + if err := a.State.SyncFull(); err != nil { + t.Fatalf("err: %v", err) + } + // Wait for the sync - this should have been a sync of just the node info retry.Run(t, func(r *retry.R) { if err := a.RPC("Catalog.NodeServices", &req, &services); err != nil { diff --git a/agent/testagent.go b/agent/testagent.go index 9b99cd647..1c4806d36 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -65,10 +65,6 @@ type TestAgent struct { // Key is the optional encryption key for the LAN and WAN keyring. Key string - // NoInitialSync determines whether an anti-entropy run - // will be scheduled after the agent started. - NoInitialSync bool - // dns is a reference to the first started DNS endpoint. // It is valid after Start(). dns *DNSServer @@ -175,9 +171,9 @@ func (a *TestAgent) Start() *TestAgent { } } } - if !a.NoInitialSync { - a.Agent.StartSync() - } + + // Start the anti-entropy syncer + a.Agent.StartSync() var out structs.IndexedNodes retry.Run(&panicFailer{}, func(r *retry.R) { @@ -200,7 +196,7 @@ func (a *TestAgent) Start() *TestAgent { r.Fatal(a.Name, "No leader") } if out.Index == 0 { - r.Fatal(a.Name, "Consul index is 0") + r.Fatal(a.Name, ": Consul index is 0") } } else { req, _ := http.NewRequest("GET", "/v1/agent/self", nil)