Выберите правильный метод async для пакетной обработки для максимальных запросов / сек.

Мне нужно выполнить циклический вызов некоторого внешнего API с некоторой задержкой, чтобы предотвратить ограничение «Ограничение скорости пользователя превысило».

API геокодирования Google Maps чувствителен к «req / sec», что позволяет использовать 10 req / sec. Я должен сделать геокодирование для сотен моих контактов, и такая задержка требуется. Итак, мне нужно иметь 10 асинхронных функций геокодирования с пост-задержкой за 1 секунду для каждого. Таким образом, я собираю все контакты в массиве, а затем прохожу через массив с помощью async.

Как правило, мне нужно иметь N одновременных streamов с задержкой в ​​D msecs в конце каждого streamа. Целый цикл выполняет итерацию по массиву объектов пользователя. Каждый stream обрабатывает единый объект, как обычно.

Я предполагаю, что у меня есть код вроде:

const N = 10; # threads count const D = 1000; # delay after each execution var processUser = function(user, callback){ someBusinessLogicProc(user, function(err) { setTimeout(function() { return callback(err); }, D); }); } var async = require('async') ; var people = new Array(900); async.batchMethod(people, processUser, N, finalCallback); 

В этом псевдокоде batchMethod – это метод, который я прошу.

Наложение задержки на результаты не совсем то, что вы хотите. Вместо этого вы хотите отслеживать, что вы отправили, и когда вы отправили его, и как только вы попадаете под границы запросов в секунду, вы можете отправить другой запрос.


Вот общая концепция функции, которая будет контролировать ограничение скорости для вас до фиксированного количества запросов в секунду. Это использует обещания и требует, чтобы вы предоставляли функцию запроса, которая возвращает promise (если вы не используете обещания сейчас, вам просто нужно обернуть функцию запроса в promise).

 // pass the following arguments: // array - array of values to iterate // requestsPerSec - max requests per second to send (integer) // maxInFlight - max number of requests in process at a time // fn - function to process an array value // function is passed array element as first argument // function returns a promise that is resolved/rejected when async operation is done // Returns: promise that is resolved with an array of resolves values // or rejected with first error that occurs function rateLimitMap(array, requestsPerSec, maxInFlight, fn) { return new Promise(function(resolve, reject) { var index = 0; var inFlightCntr = 0; var doneCntr = 0; var launchTimes = []; var results = new Array(array.length); // calculate num requests in last second function calcRequestsInLastSecond() { var now = Date.now(); // look backwards in launchTimes to see how many were launched within the last second var cnt = 0; for (var i = launchTimes.length - 1; i >= 0; i--) { if (now - launchTimes[i] < 1000) { ++cnt; } else { break; } } return cnt; } function runMore() { while (index < array.length && inFlightCntr < maxInFlight && calcRequestsInLastSecond() < requestsPerSec) { (function(i) { ++inFlightCntr; launchTimes.push(Date.now()); fn(array[i]).then(function(val) { results[i] = val; --inFlightCntr; ++doneCntr; runMore(); }, reject); })(index); ++index; } // see if we're done if (doneCntr === array.length) { resolve(results); } else if (launchTimes.length > requestsPerSec) { // calc how long we have to wait before sending more var delta = 1000 - (Date.now() - launchTimes[launchTimes.length - requestsPerSec]); if (delta > 0) { setTimeout(runMore, delta); } } } runMore(); }); } 

Пример использования:

 rateLimitMap(inputArrayToProcess, 9, 20, myRequestFunc).then(function(results) { // process array of results here }, function(err) { // process error here }); 

Общая идея этого кода заключается в следующем:

  1. Вы передаете массив для повторения
  2. Он возвращает promise, которое разрешено значением, представляет собой массив результатов (по порядку)
  3. Вы передаете максимальное количество запросовPerSec, чтобы когда-либо попасть
  4. Вы одновременно отправляете максимальное количество запросов в полете
  5. Вы передаете функцию, которой будет передан элемент из массива, который повторяется, и должен вернуть promise
  6. Он хранит массив временных меток, когда последний запрос был отправлен.
  7. Чтобы узнать, может ли быть отправлен другой запрос, он смотрит назад в массив и подсчитывает, сколько запросов было отправлено за последнюю секунду.
  8. Если это число меньше порогового значения, оно посылает другое.
  9. Если это число соответствует пороговому значению, оно подсчитывает, сколько времени вам нужно ждать, чтобы отправить еще один, и установить таймер на этот промежуток времени.
  10. По завершении каждого запроса он проверяет, может ли он отправлять больше
  11. Если какая-либо просьба отвергает свое promise, то возвращенное promise немедленно отклоняется. Если вы не хотите, чтобы он остановился при первой ошибке, измените переданную функцию, чтобы не отклонять, но разрешите с некоторым значением, которое позже можно определить как неудавшийся запрос при обработке результатов.

Вот работая симуляция: https://jsfiddle.net/jfriend00/3gr0tq7k/

Примечание. Если значение maxInFlight вы передаете, больше, чем значение maxInFlight , тогда эта функция будет в основном просто отправлять запросыPerSec, а затем через одну секунду отправлять другие запросы requestPerSec, поскольку это самый быстрый способ оставаться под границей requestPerSec. Если значение maxInFlight одинаков или меньше, чем maxInFlight тогда он отправит requestsPerSec а затем по мере завершения каждого запроса он увидит, может ли он отправить другой.

  • MongoDB - получать документы с максимальным атрибутом для каждой группы в коллекции
  • Изменить расположение узла node_modules
  • Преобразование документов Mongoose в json
  • Что такое «подписанные» куки в connect / expressjs?
  • Socket.IO - как мне получить список подключенных сокетов / клиентов?
  • Как я могу сделать несколько проектов совместно с каталогом node_modules?
  • nodejs требуется внутри файла TypeScript
  • Как автоматически перезагрузить файлы в Node.js?
  • Как вы извлекаете данные POST в Node.js?
  • Угловой $ resource delete не отправит тело на сервер express.js
  • Ошибка: шаг «brew link» не завершился успешно
  • Давайте будем гением компьютера.