Привет, разработчики Java, я знаю, что вы заинтересованы в асинхронном программировании и я знаю, что вы боретесь с такой парадигмой. Я знаю это, потому что я один из вас!
Давайте посмотрим, как справиться с проблемами ограничения скорости в волшебном асинхронном мире Java с помощью Vert.x.
Для тех, кто не знает, небольшое резюме:
Что такое Vert.x?
Vert.x – это многоязычная, управляемая событиями и неблокирующая платформа приложений.
Что такое “Ограничение скорости”?
Ограничение скорости – это набор политик, реализуемых веб-сервером, используемых для того, чтобы заставить клиента “ограничить” запросы от перегрузки.
Что такое pagedresources?
Страничные ресурсы – это ресурсы, разделенные на разные страницы для предотвращения перегрузки.
Мы будем работать с реальным примером реализации:
Мы фанатики музыки (как и я), мы пользователи Мы фанатики музыки (как и я), мы пользователи , и мы хотим получить его данные в нашем неблокирующем
Взглянув на discogs.com Ссылка на API Мы заметим, что — помимо спецификаций API — есть некоторая информация о формате данных, заголовках, управлении версиями и так далее. Сейчас нас интересуют два раздела: Ограничение скорости и Разбивка на страницы . Это те проблемы, с которыми нужно иметь дело.
Набросок нашей архитектуры
Каждая стрелка слева направо – это вызов метода, каждая стрелка справа налево – это Будущее.
Мы начнем со дна и пойдем вверх по реке.
Ограничение Скорости
Discogs разрешает, согласно документации, 25 запросов в минуту, таким образом, 25 будет нашим пределом скорости, а одна минута – это размер окна ограничения скорости.
Идея состоит в том, чтобы иметь интерфейс с методом под названием * execute *, который примет запрос (объект данных, представляющий запрос) в качестве параметра и вернет Будущее (потому что мы думаем асинхронно!) Буфера, то есть тело ответа. Тривиально и аккуратно.
interface Requests { Future**execute**(Request request); }
Нам нужно установить задержку между запросами, следовательно, внутри реализации RequestExecutor мы будем использовать очередь для отделения вызовов execute от выполнения регулируемого запроса:
class ThrottledRequests implements Requests { ... @Override public Futureexecute(Request request) { try { Future future = Future.future(); queue.put(new RequestContext(request, future)); return future; } catch (Exception e) { return Future.failedFuture(e); } } ... }
RequestContext – это простой объект данных, который позволяет нам привязать запрос к его будущему.
Затем запрос должен быть получен из очереди и выполнен, мы сделаем это с помощью периодической задачи, запуская ее каждые N миллисекунд, где N – нефиксированное значение, вычисляемое из максимального количества запросов, которые нам разрешено выполнять в окне ограничения скорости.
Сначала мы начнем с оптимистичного 1 , поэтому вот наш конструктор ThrottledRequests :
public class ThrottledRequests implements Requests { private static final int RATE_LIMIT_WINDOW_SIZE = 60000; private final Logger log = getLogger(getClass()); private final BlockingQueuequeue = new LinkedBlockingQueue<>(); private final AtomicLong executorId = new AtomicLong(0); private final AtomicLong actualDelay = new AtomicLong(100); private final HttpClient http; private final Vertx vertx; ThrottledRequests(Vertx vertx) { this.vertx = vertx; this.http = vertx.createHttpClient(); long id = vertx.setPeriodic(actualDelay.get(), executor()); this.executorId.set(id); } ... }
Некоторые объяснения:
- vertex : вам нужен экземпляр vertx для запуска периодического исполнителя.
- фактическая задержка : это атомарная длина, представляющая фактическую задержку, поэтому, как мы уже говорили ранее, ее начальное значение равно 1.
- executor() : основной метод класса, он создает экземпляр функции, которая будет выполнять ограниченные запросы.
- executorId : нам нужно отследить идентификатор задачи исполнителя, чтобы остановить ее, когда фактическая задержка изменится. Это тоже атомарная длина
Давайте посмотрим, как обрабатывать выполнение асинхронных запросов:
private Handlerexecutor() { return timerId -> { RequestContext context = queue.poll(); if (context != null) { Request inputRequest = context.request; http.request(inputRequest.method(), inputRequest.options()) .putHeader("User-Agent", "YourApp/0.1") .setFollowRedirects(true) .handler(response -> { response.bodyHandler(context.future::complete); checkAndUpdateRateLimit(response); }) .end(); } }; }
Полегче, а? На каждый ответный запрос мы завершаем будущее и… проверяем и обновляем Ограничение скорости (): согласно документации API, в каждом ответе, который мы получим от discogs , есть заголовок, который называется X-Discogs-Ratelimit , который сообщает нам, сколько запросов мы получили. можно сделать в окне ограничения скорости в 1 минуту. Прохладный.
private void checkAndUpdateRateLimit(HttpClientResponse response) { Optional.ofNullable(response.getHeader("X-Discogs-Ratelimit")) .map(Long::parseLong) .map(rateLimit -> rateLimit - 1) .map(reqPerMinute -> RATE_LIMIT_WINDOW_SIZE / reqPerMinute) .ifPresent(throttleDelay -> { if (throttleDelay != actualDelay.getAndSet(throttleDelay)) { vertx.cancelTimer(executorId.get()); long id = vertx.setPeriodic(throttleDelay, executor()); executorId.set(id); } }); }
Расчет довольно прост:
- Проверьте заголовок ограничения скорости
- Вычтите единицу, просто чтобы избежать заполнения окна
- Рассчитайте задержку, которую вы должны предпринять между запросами, чтобы выполнить меньше запросов с предельной скоростью во временном окне (одна минута).
- Если задержка отличается от текущей, мы удаляем старый таймер исполнителя и определяем другой с новым временем задержки.
Пагинация
Обработка разбивки на страницы немного сложнее, чем ограничение скорости, поскольку с фьючерсами предстоит проделать некоторую работу, вы знаете, общее количество страниц будет известно только после выборки первой страницы.
Идея состоит в том, чтобы разделить обязанности, мы реализуем три компонента: (пример приведен на ресурсе инвентаризации, который извлекает объекты списка)
- GetListingPage: одна реализация для каждого ресурса, такой класс использует запросы в качестве соавтора, знает URL-адрес для вызова и знает, как десериализовать единственную выбранную страницу. Таким образом, его ответственность лежит на Странице.
- Pages использует getResource Page для извлечения первой страницы и, в конечном итоге, остальных, присоединяется к ним. Его ответственность лежит на Страницах.
- Клиент получает все страницы со страниц и извлекает из них объекты.
Давайте начнем со страницы Get Listing, это класс, который реализует интерфейс с помощью одного метода: apply
@Override Futureapply(String userId, Integer pageNumber) { log.info("Request {} inventory page {}", userId, pageNumber); String path = String.format("/users/%s/inventory?page=%d", userId, pageNumber); Request request = Request.get(path); Future future = Future.future(); executor.execute(request).setHandler(async -> { if (async.succeeded()) { String json = async.result().toString(); ListingPage page = Json.fromJson(json, ListingPage.class); future.complete(page); } else { future.fail(async.cause()); } }); return future; }
Ничего особенного, выполните запрос и десериализуйте вывод JSON в класс Listing Page (класс, представляющий структуру ListingPage).
Возможно, более интересный класс – это Страницы. Общедоступным методом является getFor:
public Future> getFor(String userId) { Future
> result = Future.*future*(); getPage.apply(userId, 1).setHandler(async -> { if (async.succeeded()) { T firstPage = async.result(); int totalPages = firstPage.pages(); log.info("Total pages: {}", totalPages); List
futures = IntStream .range(2, totalPages + 1) .mapToObj(page -> getPage.apply(userId, page)) .collect(Collectors.toList()); CompositeFuture.all(futures) .setHandler(joinPages(result,firstPage)); } }); return result; }
Мы запросим первую страницу, а затем, зная общее количество страниц, мы запросим каждую вторую страницу, объединив все фьючерсы в CompositeFuture.
Когда каждое будущее будет завершено, мы присоединимся к вам:
private Handler> joinPages( Future > future, T firstPage ) { return async -> { if (async.succeeded()) { CompositeFuture remnants = async.result(); Collection remnantPages = IntStream .range(0, remnants.size()) .mapToObj(remnants::resultAt) .map(Page.class::cast) .collect(Collectors.toList()); List
total = new ArrayList<>(); total.add(firstPage); total.addAll(remnantPages); future.complete(total); } else { future.fail(async.cause()); } }; }
Теперь, на более высоком уровне (клиент), мы можем просто получать страницы и извлекать объекты:
listingPages.getFor(user).setHandler(async -> { if (async.succeeded()) { Listlistings = async.result().stream() .map(ListingPage::listings) .flatMap(Collection::stream) .collect(Collectors.*toList*())); ... do your logic ... } });
Довольно круто, да?
Исходный код:
Оригинал: “https://dev.to/cherrychain/throttle-http-requests-on-paged-resources-with-vert-x-5cjm”