Используя Kafka, мы можем легко масштабировать наше приложение по горизонтали для выполнения асинхронной разбивки на страницы в ElasticSearch.
Допустим, у вас есть индекс ElasticSearch из 1 000 000 документов, и вам нужно выполнить операцию с этими документами. Мы уже знаем, насколько дорого обходится глубокая подкачка в ElasticSearch, особенно index.max_result_window и выполнение поиска “from:”.
GET /_search { "from": 5, "size": 20, "query": { "match": { "user.id": "jay" } } }
Одним из способов преодоления этой проблемы является использование search_after. В этом случае ваш процесс становится синхронным, что означает, что вы не можете вызвать второй фрагмент данных, не получив результатов первого вызова, например:
GET /_search { "size": 10000, "query": { "match" : { "user.id" : "jay" } }, "sort": [ {"@timestamp": "asc"} ] }
результаты:
{ "took" : ..., "timed_out" : false, "_shards" : ..., "hits" : { "total" : ..., "max_score" : null, "hits" : [ ... { "_source" : ..., "sort" : [ 4098435132000 ] } ] } }
Как вы можете видеть, результаты дают вам значение сортировки (4098435132000), которое вы можете использовать во втором вызове “search_after”, чтобы получить следующий фрагмент в виде:
GET /_search { "size": 10000, "query": { "match" : { "user.id" : "jay" } }, "sort": [ {"@timestamp": "asc"} ], "search_after": [ 4098435132000 ] }
затем получите следующее значение сортировки и используйте его в своем следующем вызове.
Этот процесс последовательно асинхронен, что означает, что для прохождения 1 000 000 документов вам нужно вызывать один раз за другим.
Что может пойти не так?
- допустим, ваше приложение умирает или у него заканчивается память во время этих вызовов
- как узнать, с чего начать после это не удалось
- возможно, этот процесс очень медленный для вашего варианта использования приложения
Решение С Помощью “Среза”
Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число
Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число
GET /_search?scroll=1m { "slice": { "id":0, "max":100 }, "size": 10000, "sort": [ {"@timestamp": "asc"} ] }
Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число
Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число
Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число
Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число
# Elastic Search docker-compose -f docker-compose-es.yml up -d # Kafka docker-compose -f docker-compose-kafka.yml up -d
Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число
Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число
public void paginationProcess() { log.debug("paginationProcess called"); //call to ES and get the total count Response response = restHighLevelClient.getLowLevelClient() .performRequest(new Request("GET", String.format("%s/_count", INDEX_NAME))); ResponseCountDto responseCountDto = objectMapper.readValue(EntityUtils.toString(response.getEntity()), ResponseCountDto.class); log.debug("responseCountDto: {}", responseCountDto.getCount()); //let say you want to have a page size of 500 then count / 500 int max = responseCountDto.getCount() / 500; log.debug("count: {} , max: {}", responseCountDto.getCount(), max); //producer IntStream.range(0, max).forEach(i -> paginationBinder.paginationOut()// .send(MessageBuilder.withPayload(// PaginationDto.builder()// .id(i)//slice id .max(max)// let say i want to have page size of 500 then: count / 500 .build())// .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build())); }
Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число
@StreamListener(PaginationBinder.PAGINATION_IN) public void paginationProcess(@Payload PaginationDto paginationDto) { // Call ES log.debug("paginationProcess: {}", paginationDto); try { Request request = new Request("GET", String.format("%s/_search?scroll=1m", INDEX_NAME)); //sorted by localDateTime and slice by id and max as parameters request.setJsonEntity(String.format("{\"slice\":{\"id\":%s,\"max\":%s},\"size\":10000,\"sort\":[{\"localDateTime\":\"asc\"}]}", paginationDto.getId(), paginationDto.getMax())); Response response = restHighLevelClient.getLowLevelClient().performRequest(request); //do something with the response ... log.debug("response: {}", response); } catch (IOException e) { e.printStackTrace(); } }
Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число В противном случае мы потеряем согласованность результата во время разбивки на страницы.
приложение.файл yml
spring: application: name: es-pagination cloud.stream: bindings: pagination-out: destination: pagination producer: partition-count: 10 pagination-in: destination: pagination group: ${spring.application.name}.pagination-group consumer: maxAttempts: 5 pagination-in-dlq: destination: paginationDLQ group: ${spring.application.name}.pagination-group kafka: streams: bindings: pagination-in: consumer: enableDlq: true dlqName: paginationDLQ autoCommitOnError: true autoCommitOffset: true binder: autoAddPartitions: true min-partition-count: 10 configuration: commit.interval.ms: 100 default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
Избегайте поджаривания памяти
Если количество фрагментов больше, чем количество сегментов, фильтр фрагментов будет работать очень медленно при первых вызовах. Он имеет сложность O(N) и стоимость памяти, равную N битам на фрагмент, где N – общее количество документов в сегменте. После нескольких вызовов фильтр должен быть кэширован, а последующие вызовы должны выполняться быстрее, но вы должны ограничить количество запросов, выполняемых параллельно, чтобы избежать взрыва памяти.
Вы можете найти проект в моем аккаунте на Github: Вы можете найти проект в моем аккаунте на Github:
Ссылка: https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html
Оригинал: “https://dev.to/ehsaniara/elasticsearch-parallel-pagination-by-kafka-304a”