UI: Add blocking cursor validation and more straightforward throttle (#5470)
More recommendations for blocking queries clients was added here: https://github.com/hashicorp/consul/pull/5358 This commit mainly adds cursor/index validation/correction based on these recommendations (plus tests) The recommendations also suggest that clients should include rate limiting. Because of this, we've moved the throttling out of Consul UI specific code and into Blocking Query specific code. Currently the 'rate limiting' in this commit only adds a sleep to every iteration of the loop, which is not the recommended approach, but the code here organizes the throttling functionality into something we can work with later to provide something more apt.
This commit is contained in:
parent
f1c8db1447
commit
45a2fa5a01
|
@ -7,7 +7,6 @@ import { cache as createCache, BlockingEventSource } from 'consul-ui/utils/dom/e
|
|||
|
||||
const createProxy = function(repo, find, settings, cache, serialize = JSON.stringify) {
|
||||
// proxied find*..(id, dc)
|
||||
const throttle = get(this, 'wait').execute;
|
||||
const client = get(this, 'client');
|
||||
return function() {
|
||||
const key = `${repo.getModelName()}.${find}.${serialize([...arguments])}`;
|
||||
|
@ -16,49 +15,38 @@ const createProxy = function(repo, find, settings, cache, serialize = JSON.strin
|
|||
return newPromisedEventSource(
|
||||
function(configuration) {
|
||||
// take a copy of the original arguments
|
||||
// this means we don't have any configuration object on it
|
||||
let args = [..._args];
|
||||
if (configuration.settings.enabled) {
|
||||
// ...and only add our current cursor/configuration if we are blocking
|
||||
args = args.concat([configuration]);
|
||||
}
|
||||
// save a callback so we can conditionally throttle
|
||||
const cb = () => {
|
||||
// original find... with configuration now added
|
||||
return repo[find](...args)
|
||||
.then(res => {
|
||||
if (!configuration.settings.enabled) {
|
||||
// blocking isn't enabled, immediately close
|
||||
this.close();
|
||||
}
|
||||
return res;
|
||||
})
|
||||
.catch(function(e) {
|
||||
// setup the aborted connection restarting
|
||||
// this should happen here to avoid cache deletion
|
||||
const status = get(e, 'errors.firstObject.status');
|
||||
if (status === '0') {
|
||||
// Any '0' errors (abort) should possibly try again, depending upon the circumstances
|
||||
// whenAvailable returns a Promise that resolves when the client is available
|
||||
// again
|
||||
return client.whenAvailable(e);
|
||||
}
|
||||
throw e;
|
||||
});
|
||||
};
|
||||
// if we have a cursor (which means its at least the second call)
|
||||
// and we have a throttle setting, wait for so many ms
|
||||
if (typeof configuration.cursor !== 'undefined' && configuration.settings.throttle) {
|
||||
return throttle(configuration.settings.throttle).then(cb);
|
||||
}
|
||||
return cb();
|
||||
// original find... with configuration now added
|
||||
return repo[find](...args)
|
||||
.then(res => {
|
||||
if (!configuration.settings.enabled) {
|
||||
// blocking isn't enabled, immediately close
|
||||
this.close();
|
||||
}
|
||||
return res;
|
||||
})
|
||||
.catch(function(e) {
|
||||
// setup the aborted connection restarting
|
||||
// this should happen here to avoid cache deletion
|
||||
const status = get(e, 'errors.firstObject.status');
|
||||
if (status === '0') {
|
||||
// Any '0' errors (abort) should possibly try again, depending upon the circumstances
|
||||
// whenAvailable returns a Promise that resolves when the client is available
|
||||
// again
|
||||
return client.whenAvailable(e);
|
||||
}
|
||||
throw e;
|
||||
});
|
||||
},
|
||||
{
|
||||
key: key,
|
||||
type: BlockingEventSource,
|
||||
settings: {
|
||||
enabled: settings.blocking,
|
||||
throttle: settings.throttle,
|
||||
},
|
||||
}
|
||||
);
|
||||
|
|
|
@ -21,6 +21,26 @@ export const create5xxBackoff = function(ms = 3000, P = Promise, wait = setTimeo
|
|||
throw err;
|
||||
};
|
||||
};
|
||||
export const validateCursor = function(current, prev = null) {
|
||||
let cursor = parseInt(current);
|
||||
if (!isNaN(cursor)) {
|
||||
// if cursor is less than the current cursor, reset to zero
|
||||
if (prev !== null && cursor < prev) {
|
||||
cursor = 0;
|
||||
}
|
||||
// if cursor is less than 0, its always safe to use 1
|
||||
return Math.max(cursor, 1);
|
||||
}
|
||||
};
|
||||
const throttle = function(configuration, prev, current) {
|
||||
return function(obj) {
|
||||
return new Promise(function(resolve, reject) {
|
||||
setTimeout(function() {
|
||||
resolve(obj);
|
||||
}, 200);
|
||||
});
|
||||
};
|
||||
};
|
||||
const defaultCreateEvent = function(result, configuration) {
|
||||
return {
|
||||
type: 'message',
|
||||
|
@ -55,29 +75,32 @@ export default function(EventSource, backoff = create5xxBackoff()) {
|
|||
.apply(this, [superConfiguration])
|
||||
.catch(backoff)
|
||||
.then(result => {
|
||||
if (!(result instanceof Error)) {
|
||||
const _createEvent =
|
||||
typeof createEvent === 'function' ? createEvent : defaultCreateEvent;
|
||||
let event = _createEvent(result, configuration);
|
||||
// allow custom types, but make a default of `message`, ideally this would check for CustomEvent
|
||||
// but keep this flexible for the moment
|
||||
if (!event.type) {
|
||||
event = {
|
||||
type: 'message',
|
||||
data: event,
|
||||
};
|
||||
}
|
||||
// meta is also configurable by using createEvent
|
||||
const meta = get(event.data || {}, 'meta');
|
||||
if (meta) {
|
||||
// pick off the `cursor` from the meta and add it to configuration
|
||||
configuration.cursor = meta.cursor;
|
||||
}
|
||||
this.currentEvent = event;
|
||||
this.dispatchEvent(this.currentEvent);
|
||||
this.previousEvent = this.currentEvent;
|
||||
if (result instanceof Error) {
|
||||
return result;
|
||||
}
|
||||
return result;
|
||||
const _createEvent =
|
||||
typeof createEvent === 'function' ? createEvent : defaultCreateEvent;
|
||||
let event = _createEvent(result, configuration);
|
||||
// allow custom types, but make a default of `message`, ideally this would check for CustomEvent
|
||||
// but keep this flexible for the moment
|
||||
if (!event.type) {
|
||||
event = {
|
||||
type: 'message',
|
||||
data: event,
|
||||
};
|
||||
}
|
||||
// meta is also configurable by using createEvent
|
||||
const meta = get(event.data || {}, 'meta');
|
||||
if (meta) {
|
||||
// pick off the `cursor` from the meta and add it to configuration
|
||||
// along with cursor validation
|
||||
configuration.cursor = validateCursor(meta.cursor, configuration.cursor);
|
||||
}
|
||||
this.currentEvent = event;
|
||||
this.dispatchEvent(this.currentEvent);
|
||||
const throttledResolve = throttle(configuration, this.currentEvent, this.previousEvent);
|
||||
this.previousEvent = this.currentEvent;
|
||||
return throttledResolve(result);
|
||||
});
|
||||
}, configuration);
|
||||
}
|
||||
|
|
|
@ -9,7 +9,6 @@ Feature: dc / list-blocking
|
|||
---
|
||||
consul:client:
|
||||
blocking: 1
|
||||
throttle: 200
|
||||
---
|
||||
Scenario:
|
||||
And 3 [Model] models
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import domEventSourceBlocking, {
|
||||
validateCursor,
|
||||
create5xxBackoff,
|
||||
} from 'consul-ui/utils/dom/event-source/blocking';
|
||||
import { module } from 'qunit';
|
||||
|
@ -85,3 +86,63 @@ test('the 5xx backoff returns a resolve promise on a 5xx (apart from 500)', func
|
|||
assert.ok(timeout.calledOnce, 'timeout was called once');
|
||||
});
|
||||
});
|
||||
test("the cursor validation always returns undefined if the cursor can't be parsed to an integer", function(assert) {
|
||||
['null', null, '', undefined].forEach(item => {
|
||||
const actual = validateCursor(item);
|
||||
assert.equal(actual, undefined);
|
||||
});
|
||||
});
|
||||
test('the cursor validation always returns a cursor greater than zero', function(assert) {
|
||||
[
|
||||
{
|
||||
cursor: 0,
|
||||
expected: 1,
|
||||
},
|
||||
{
|
||||
cursor: -10,
|
||||
expected: 1,
|
||||
},
|
||||
{
|
||||
cursor: -1,
|
||||
expected: 1,
|
||||
},
|
||||
{
|
||||
cursor: -1000,
|
||||
expected: 1,
|
||||
},
|
||||
{
|
||||
cursor: 10,
|
||||
expected: 10,
|
||||
},
|
||||
].forEach(item => {
|
||||
const actual = validateCursor(item.cursor);
|
||||
assert.equal(actual, item.expected, 'cursor is greater than zero');
|
||||
});
|
||||
});
|
||||
test('the cursor validation resets to 1 if its less than the previous cursor', function(assert) {
|
||||
[
|
||||
{
|
||||
previous: 100,
|
||||
cursor: 99,
|
||||
expected: 1,
|
||||
},
|
||||
{
|
||||
previous: 100,
|
||||
cursor: -10,
|
||||
expected: 1,
|
||||
},
|
||||
{
|
||||
previous: 100,
|
||||
cursor: 0,
|
||||
expected: 1,
|
||||
},
|
||||
{
|
||||
previous: 100,
|
||||
cursor: 101,
|
||||
expected: 101,
|
||||
},
|
||||
].forEach(item => {
|
||||
const actual = validateCursor(item.cursor, item.previous);
|
||||
assert.equal(actual, item.expected);
|
||||
});
|
||||
});
|
||||
|
|
Loading…
Reference in New Issue