Рубрики
Без рубрики

Дросселирование HTTP-запросов на выгружаемые ресурсы с помощью Vert.x

Привет, разработчики Java, я знаю, что вы заинтересованы в асинхронном программировании и я знаю, что ты святой… Помечено как vertx, async, java, регулирование.

Привет, разработчики Java, я знаю, что вы заинтересованы в асинхронном программировании и я знаю, что вы боретесь с такой парадигмой. Я знаю это, потому что я один из вас!

Давайте посмотрим, как справиться с проблемами ограничения скорости в волшебном асинхронном мире Java с помощью Vert.x.

Для тех, кто не знает, небольшое резюме:

Что такое Vert.x?

Vert.x – это многоязычная, управляемая событиями и неблокирующая платформа приложений.

Что такое “Ограничение скорости”?

Ограничение скорости – это набор политик, реализуемых веб-сервером, используемых для того, чтобы заставить клиента “ограничить” запросы от перегрузки.

Что такое pagedresources?

Страничные ресурсы – это ресурсы, разделенные на разные страницы для предотвращения перегрузки.

Мы будем работать с реальным примером реализации:

Мы фанатики музыки (как и я), мы пользователи Мы фанатики музыки (как и я), мы пользователи , и мы хотим получить его данные в нашем неблокирующем

Взглянув на discogs.com Ссылка на API Мы заметим, что — помимо спецификаций API — есть некоторая информация о формате данных, заголовках, управлении версиями и так далее. Сейчас нас интересуют два раздела: Ограничение скорости и Разбивка на страницы . Это те проблемы, с которыми нужно иметь дело.

Набросок нашей архитектуры

Каждая стрелка слева направо – это вызов метода, каждая стрелка справа налево – это Будущее.

Мы начнем со дна и пойдем вверх по реке.

Ограничение Скорости

Discogs разрешает, согласно документации, 25 запросов в минуту, таким образом, 25 будет нашим пределом скорости, а одна минута – это размер окна ограничения скорости.

Идея состоит в том, чтобы иметь интерфейс с методом под названием * execute *, который примет запрос (объект данных, представляющий запрос) в качестве параметра и вернет Будущее (потому что мы думаем асинхронно!) Буфера, то есть тело ответа. Тривиально и аккуратно.

interface Requests {
  Future **execute**(Request request);
}

Нам нужно установить задержку между запросами, следовательно, внутри реализации RequestExecutor мы будем использовать очередь для отделения вызовов execute от выполнения регулируемого запроса:

class ThrottledRequests implements Requests {

  ...

  @Override
  public Future execute(Request request) {
    try {
      Future future = Future.future();
      queue.put(new RequestContext(request, future));
      return future;        
    } catch (Exception e) {
      return Future.failedFuture(e);
    }
  }

  ...

}

RequestContext – это простой объект данных, который позволяет нам привязать запрос к его будущему.

Затем запрос должен быть получен из очереди и выполнен, мы сделаем это с помощью периодической задачи, запуская ее каждые N миллисекунд, где N – нефиксированное значение, вычисляемое из максимального количества запросов, которые нам разрешено выполнять в окне ограничения скорости.

Сначала мы начнем с оптимистичного 1 , поэтому вот наш конструктор ThrottledRequests :

public class ThrottledRequests implements Requests {
  private static final int RATE_LIMIT_WINDOW_SIZE = 60000;
  private final Logger log = getLogger(getClass());
  private final BlockingQueue queue = new LinkedBlockingQueue<>();
  private final AtomicLong executorId = new AtomicLong(0);
  private final AtomicLong actualDelay = new AtomicLong(100);
  private final HttpClient http;
  private final Vertx vertx;

  ThrottledRequests(Vertx vertx) {
    this.vertx = vertx;
    this.http = vertx.createHttpClient();
    long id = vertx.setPeriodic(actualDelay.get(), executor());
    this.executorId.set(id);
  }

  ...
}

Некоторые объяснения:

  • vertex : вам нужен экземпляр vertx для запуска периодического исполнителя.
  • фактическая задержка : это атомарная длина, представляющая фактическую задержку, поэтому, как мы уже говорили ранее, ее начальное значение равно 1.
  • executor() : основной метод класса, он создает экземпляр функции, которая будет выполнять ограниченные запросы.
  • executorId : нам нужно отследить идентификатор задачи исполнителя, чтобы остановить ее, когда фактическая задержка изменится. Это тоже атомарная длина

Давайте посмотрим, как обрабатывать выполнение асинхронных запросов:

private Handler executor() {
  return timerId -> {
    RequestContext context = queue.poll();
    if (context != null) {
      Request inputRequest = context.request;
      http.request(inputRequest.method(), inputRequest.options())
        .putHeader("User-Agent", "YourApp/0.1")
        .setFollowRedirects(true)
        .handler(response -> {
          response.bodyHandler(context.future::complete);
          checkAndUpdateRateLimit(response);
        })
        .end();
    }
  };
}

Полегче, а? На каждый ответный запрос мы завершаем будущее и… проверяем и обновляем Ограничение скорости (): согласно документации API, в каждом ответе, который мы получим от discogs , есть заголовок, который называется X-Discogs-Ratelimit , который сообщает нам, сколько запросов мы получили. можно сделать в окне ограничения скорости в 1 минуту. Прохладный.

private void checkAndUpdateRateLimit(HttpClientResponse response) {
  Optional.ofNullable(response.getHeader("X-Discogs-Ratelimit"))
    .map(Long::parseLong)
    .map(rateLimit -> rateLimit - 1)
    .map(reqPerMinute -> RATE_LIMIT_WINDOW_SIZE / reqPerMinute)
    .ifPresent(throttleDelay -> {
      if (throttleDelay != actualDelay.getAndSet(throttleDelay)) {
        vertx.cancelTimer(executorId.get());
        long id = vertx.setPeriodic(throttleDelay, executor());
        executorId.set(id);
      }
    });
}

Расчет довольно прост:

  • Проверьте заголовок ограничения скорости
  • Вычтите единицу, просто чтобы избежать заполнения окна
  • Рассчитайте задержку, которую вы должны предпринять между запросами, чтобы выполнить меньше запросов с предельной скоростью во временном окне (одна минута).
  • Если задержка отличается от текущей, мы удаляем старый таймер исполнителя и определяем другой с новым временем задержки.

Пагинация

Обработка разбивки на страницы немного сложнее, чем ограничение скорости, поскольку с фьючерсами предстоит проделать некоторую работу, вы знаете, общее количество страниц будет известно только после выборки первой страницы.

Идея состоит в том, чтобы разделить обязанности, мы реализуем три компонента: (пример приведен на ресурсе инвентаризации, который извлекает объекты списка)

  • GetListingPage: одна реализация для каждого ресурса, такой класс использует запросы в качестве соавтора, знает URL-адрес для вызова и знает, как десериализовать единственную выбранную страницу. Таким образом, его ответственность лежит на Странице.
  • Pages использует getResource Page для извлечения первой страницы и, в конечном итоге, остальных, присоединяется к ним. Его ответственность лежит на Страницах.
  • Клиент получает все страницы со страниц и извлекает из них объекты.

Давайте начнем со страницы Get Listing, это класс, который реализует интерфейс с помощью одного метода: apply

@Override
Future apply(String userId, Integer pageNumber) {
  log.info("Request {} inventory page {}", userId, pageNumber);
  String path = String.format("/users/%s/inventory?page=%d", userId, pageNumber);
  Request request = Request.get(path);

  Future future = Future.future();
  executor.execute(request).setHandler(async -> {
    if (async.succeeded()) {
      String json = async.result().toString();
      ListingPage page = Json.fromJson(json, ListingPage.class);
      future.complete(page);
    } else {
      future.fail(async.cause());
    }
  });

  return future;
}

Ничего особенного, выполните запрос и десериализуйте вывод JSON в класс Listing Page (класс, представляющий структуру ListingPage).

Возможно, более интересный класс – это Страницы. Общедоступным методом является getFor:

public Future> getFor(String userId) {
  Future> result = Future.*future*();

  getPage.apply(userId, 1).setHandler(async -> {
    if (async.succeeded()) {
      T firstPage = async.result();
      int totalPages = firstPage.pages();
      log.info("Total pages: {}", totalPages);

      List futures = IntStream
        .range(2, totalPages + 1)
        .mapToObj(page -> getPage.apply(userId, page))
        .collect(Collectors.toList());

      CompositeFuture.all(futures)
        .setHandler(joinPages(result,firstPage));
    }
  });

  return result;
}

Мы запросим первую страницу, а затем, зная общее количество страниц, мы запросим каждую вторую страницу, объединив все фьючерсы в CompositeFuture.

Когда каждое будущее будет завершено, мы присоединимся к вам:

private Handler> joinPages(
  Future> future, T firstPage
) {
  return async -> {
    if (async.succeeded()) {
      CompositeFuture remnants = async.result();
      Collection remnantPages = IntStream
        .range(0, remnants.size())
        .mapToObj(remnants::resultAt)
        .map(Page.class::cast)
        .collect(Collectors.toList());

      List total = new ArrayList<>();
      total.add(firstPage);
      total.addAll(remnantPages);

      future.complete(total);
    } else {
      future.fail(async.cause());
    }
  };
}

Теперь, на более высоком уровне (клиент), мы можем просто получать страницы и извлекать объекты:

listingPages.getFor(user).setHandler(async -> {
  if (async.succeeded()) {
    List listings = async.result().stream()
      .map(ListingPage::listings)
      .flatMap(Collection::stream)
      .collect(Collectors.*toList*()));

    ... do your logic ...

  }
});

Довольно круто, да?

Исходный код:

Оригинал: “https://dev.to/cherrychain/throttle-http-requests-on-paged-resources-with-vert-x-5cjm”