Рубрики
Без рубрики

ElasticSearch Параллельная разбивка на страницы с помощью Kafka

Используя Kafka, мы можем легко масштабировать наше приложение по горизонтали для выполнения асинхронной разбивки на страницы в Elas… С тегами elasticsearch, kafka, spring cloud, java.

Используя Kafka, мы можем легко масштабировать наше приложение по горизонтали для выполнения асинхронной разбивки на страницы в ElasticSearch.

Допустим, у вас есть индекс ElasticSearch из 1 000 000 документов, и вам нужно выполнить операцию с этими документами. Мы уже знаем, насколько дорого обходится глубокая подкачка в ElasticSearch, особенно index.max_result_window и выполнение поиска “from:”.

GET /_search
{
  "from": 5,
  "size": 20,
  "query": {
    "match": {
      "user.id": "jay"
    }
  }
}

Одним из способов преодоления этой проблемы является использование search_after. В этом случае ваш процесс становится синхронным, что означает, что вы не можете вызвать второй фрагмент данных, не получив результатов первого вызова, например:

GET /_search
{
  "size": 10000,
  "query": {
    "match" : {
      "user.id" : "jay"
    }
  },
  "sort": [ 
    {"@timestamp": "asc"}
  ]
}

результаты:

{
  "took" : ...,
  "timed_out" : false,
  "_shards" : ...,
  "hits" : {
    "total" : ...,
    "max_score" : null,
    "hits" : [
      ...
      {
        "_source" : ...,
        "sort" : [                                
          4098435132000
        ]
      }
    ]
  }
}

Как вы можете видеть, результаты дают вам значение сортировки (4098435132000), которое вы можете использовать во втором вызове “search_after”, чтобы получить следующий фрагмент в виде:

GET /_search
{
  "size": 10000,
  "query": {
    "match" : {
      "user.id" : "jay"
    }
  },
  "sort": [
    {"@timestamp": "asc"}
  ],
  "search_after": [                                
    4098435132000
  ]
}

затем получите следующее значение сортировки и используйте его в своем следующем вызове.

Этот процесс последовательно асинхронен, что означает, что для прохождения 1 000 000 документов вам нужно вызывать один раз за другим.

Что может пойти не так?

  • допустим, ваше приложение умирает или у него заканчивается память во время этих вызовов
  • как узнать, с чего начать после это не удалось
  • возможно, этот процесс очень медленный для вашего варианта использования приложения

Решение С Помощью “Среза”

Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число

Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число

GET /_search?scroll=1m
{
  "slice": {
     "id":0,
     "max":100  
  },
  "size": 10000,
  "sort": [
    {"@timestamp": "asc"}
  ]
}

Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число

Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число

Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число

Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число

# Elastic Search
docker-compose -f docker-compose-es.yml up -d
# Kafka
docker-compose -f docker-compose-kafka.yml up -d

Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число

Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число

public void paginationProcess() {
    log.debug("paginationProcess called");
    //call to ES and get the total count
    Response response = restHighLevelClient.getLowLevelClient()
    .performRequest(new Request("GET", String.format("%s/_count", INDEX_NAME)));

    ResponseCountDto responseCountDto = objectMapper.readValue(EntityUtils.toString(response.getEntity()), ResponseCountDto.class);

    log.debug("responseCountDto: {}", responseCountDto.getCount());

    //let say you want to have a page size of 500 then count / 500

    int max = responseCountDto.getCount() / 500;
    log.debug("count: {} , max: {}", responseCountDto.getCount(), max);

    //producer
    IntStream.range(0, max).forEach(i -> paginationBinder.paginationOut()//
    .send(MessageBuilder.withPayload(//
    PaginationDto.builder()//
    .id(i)//slice id
    .max(max)// let say i want to have page size of 500 then: count / 500
    .build())//
    .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build()));
}

Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число

 @StreamListener(PaginationBinder.PAGINATION_IN)
    public void paginationProcess(@Payload PaginationDto paginationDto) {
        // Call ES
        log.debug("paginationProcess: {}", paginationDto);
        try {
            Request request = new Request("GET", String.format("%s/_search?scroll=1m", INDEX_NAME));
            //sorted by localDateTime and slice by id and max as parameters
            request.setJsonEntity(String.format("{\"slice\":{\"id\":%s,\"max\":%s},\"size\":10000,\"sort\":[{\"localDateTime\":\"asc\"}]}", paginationDto.getId(), paginationDto.getMax()));
            Response response = restHighLevelClient.getLowLevelClient().performRequest(request);
            //do something with the response ...
            log.debug("response: {}", response);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число В противном случае мы потеряем согласованность результата во время разбивки на страницы.

приложение.файл yml

spring:
  application:
    name: es-pagination

  cloud.stream:
    bindings:

      pagination-out:
        destination: pagination
        producer:
          partition-count: 10
      pagination-in:
        destination: pagination
        group: ${spring.application.name}.pagination-group
        consumer:
          maxAttempts: 5
      pagination-in-dlq:
        destination: paginationDLQ
        group: ${spring.application.name}.pagination-group

    kafka:
      streams:
        bindings:
          pagination-in:
            consumer:
              enableDlq: true
              dlqName: paginationDLQ
              autoCommitOnError: true
              autoCommitOffset: true
        binder:
          autoAddPartitions: true
          min-partition-count: 10
          configuration:
            commit.interval.ms: 100
            default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
            default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

Избегайте поджаривания памяти

Если количество фрагментов больше, чем количество сегментов, фильтр фрагментов будет работать очень медленно при первых вызовах. Он имеет сложность O(N) и стоимость памяти, равную N битам на фрагмент, где N – общее количество документов в сегменте. После нескольких вызовов фильтр должен быть кэширован, а последующие вызовы должны выполняться быстрее, но вы должны ограничить количество запросов, выполняемых параллельно, чтобы избежать взрыва памяти.

Вы можете найти проект в моем аккаунте на Github: Вы можете найти проект в моем аккаунте на Github:

Ссылка: https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html

Оригинал: “https://dev.to/ehsaniara/elasticsearch-parallel-pagination-by-kafka-304a”