Используя Kafka, мы можем легко масштабировать наше приложение по горизонтали для выполнения асинхронной разбивки на страницы в ElasticSearch.
Допустим, у вас есть индекс ElasticSearch из 1 000 000 документов, и вам нужно выполнить операцию с этими документами. Мы уже знаем, насколько дорого обходится глубокая подкачка в ElasticSearch, особенно index.max_result_window и выполнение поиска “from:”.
GET /_search
{
"from": 5,
"size": 20,
"query": {
"match": {
"user.id": "jay"
}
}
}
Одним из способов преодоления этой проблемы является использование search_after. В этом случае ваш процесс становится синхронным, что означает, что вы не можете вызвать второй фрагмент данных, не получив результатов первого вызова, например:
GET /_search
{
"size": 10000,
"query": {
"match" : {
"user.id" : "jay"
}
},
"sort": [
{"@timestamp": "asc"}
]
}
результаты:
{
"took" : ...,
"timed_out" : false,
"_shards" : ...,
"hits" : {
"total" : ...,
"max_score" : null,
"hits" : [
...
{
"_source" : ...,
"sort" : [
4098435132000
]
}
]
}
}
Как вы можете видеть, результаты дают вам значение сортировки (4098435132000), которое вы можете использовать во втором вызове “search_after”, чтобы получить следующий фрагмент в виде:
GET /_search
{
"size": 10000,
"query": {
"match" : {
"user.id" : "jay"
}
},
"sort": [
{"@timestamp": "asc"}
],
"search_after": [
4098435132000
]
}
затем получите следующее значение сортировки и используйте его в своем следующем вызове.
Этот процесс последовательно асинхронен, что означает, что для прохождения 1 000 000 документов вам нужно вызывать один раз за другим.
Что может пойти не так?
- допустим, ваше приложение умирает или у него заканчивается память во время этих вызовов
- как узнать, с чего начать после это не удалось
- возможно, этот процесс очень медленный для вашего варианта использования приложения
Решение С Помощью “Среза”
Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число
Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число
GET /_search?scroll=1m
{
"slice": {
"id":0,
"max":100
},
"size": 10000,
"sort": [
{"@timestamp": "asc"}
]
}
Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число
Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число
Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число
Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число
# Elastic Search docker-compose -f docker-compose-es.yml up -d # Kafka docker-compose -f docker-compose-kafka.yml up -d
Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число
Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число
public void paginationProcess() {
log.debug("paginationProcess called");
//call to ES and get the total count
Response response = restHighLevelClient.getLowLevelClient()
.performRequest(new Request("GET", String.format("%s/_count", INDEX_NAME)));
ResponseCountDto responseCountDto = objectMapper.readValue(EntityUtils.toString(response.getEntity()), ResponseCountDto.class);
log.debug("responseCountDto: {}", responseCountDto.getCount());
//let say you want to have a page size of 500 then count / 500
int max = responseCountDto.getCount() / 500;
log.debug("count: {} , max: {}", responseCountDto.getCount(), max);
//producer
IntStream.range(0, max).forEach(i -> paginationBinder.paginationOut()//
.send(MessageBuilder.withPayload(//
PaginationDto.builder()//
.id(i)//slice id
.max(max)// let say i want to have page size of 500 then: count / 500
.build())//
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build()));
}
Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число
@StreamListener(PaginationBinder.PAGINATION_IN)
public void paginationProcess(@Payload PaginationDto paginationDto) {
// Call ES
log.debug("paginationProcess: {}", paginationDto);
try {
Request request = new Request("GET", String.format("%s/_search?scroll=1m", INDEX_NAME));
//sorted by localDateTime and slice by id and max as parameters
request.setJsonEntity(String.format("{\"slice\":{\"id\":%s,\"max\":%s},\"size\":10000,\"sort\":[{\"localDateTime\":\"asc\"}]}", paginationDto.getId(), paginationDto.getMax()));
Response response = restHighLevelClient.getLowLevelClient().performRequest(request);
//do something with the response ...
log.debug("response: {}", response);
} catch (IOException e) {
e.printStackTrace();
}
}
Один вызов выполняется для получения количества документов (которое в данном случае равно 1 м), затем создайте производителя Kafka, чтобы поместить последовательности (0-100) в вашу тему Kafka и определить размер вашего раздела как число В противном случае мы потеряем согласованность результата во время разбивки на страницы.
приложение.файл yml
spring:
application:
name: es-pagination
cloud.stream:
bindings:
pagination-out:
destination: pagination
producer:
partition-count: 10
pagination-in:
destination: pagination
group: ${spring.application.name}.pagination-group
consumer:
maxAttempts: 5
pagination-in-dlq:
destination: paginationDLQ
group: ${spring.application.name}.pagination-group
kafka:
streams:
bindings:
pagination-in:
consumer:
enableDlq: true
dlqName: paginationDLQ
autoCommitOnError: true
autoCommitOffset: true
binder:
autoAddPartitions: true
min-partition-count: 10
configuration:
commit.interval.ms: 100
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
Избегайте поджаривания памяти
Если количество фрагментов больше, чем количество сегментов, фильтр фрагментов будет работать очень медленно при первых вызовах. Он имеет сложность O(N) и стоимость памяти, равную N битам на фрагмент, где N – общее количество документов в сегменте. После нескольких вызовов фильтр должен быть кэширован, а последующие вызовы должны выполняться быстрее, но вы должны ограничить количество запросов, выполняемых параллельно, чтобы избежать взрыва памяти.
Вы можете найти проект в моем аккаунте на Github: Вы можете найти проект в моем аккаунте на Github:
Ссылка: https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html
Оригинал: “https://dev.to/ehsaniara/elasticsearch-parallel-pagination-by-kafka-304a”