Недавно я работал над множеством высокопроизводительных производителей кафки. Наше приложение публикует около 3 миллионов публикаций кафки в день. (что все еще мало по сравнению с тем, с чем может справиться кафка)
На этом пути есть некоторые уроки по поддержанию таких производителей кафки:
- Выбирайте количество разделов с умом: количество разделов определяет, насколько потребители могут масштабироваться. Количество разделов – это степень параллелизма в кафке. Кафка передает данные одного раздела одному потоку.
Наше общее правило состоит в том, чтобы иметь разделы, равные количеству серверов-потребителей. Например, если у нас есть кластер из 20 серверов, использующих тему кафки, каждый сервер будет потреблять из одного раздела, поэтому 20 разделов.
Есть много других факторов, которые необходимо учитывать как объяснено здесь
Выберите согласованный ключ при публикации – Сообщения, опубликованные с одним и тем же ключом, будут опубликованы в одном разделе. Раздел – это логическая единица упорядочения сообщений. Поэтому, если для вас важен порядок сообщений, вам следует выбрать согласованный ключ для этих сообщений.
Используйте силу асинхронности – производитель Кафки по умолчанию асинхронен, если вы явно не используете блокирующий вызов. Это означает, что публикация кафки может завершиться неудачей, и ваш код уже прошел бы мимо метода публикации. Производитель Кафки обеспечивает обратный вызов после того, как сервер выполнил инструкцию публикации. В этом обратном вызове пользователь может проверить наличие сбоя и повторить попытку или отправить в очередь мертвых писем и т.д. Сам производитель Кафки повторяет попытку 3 раза, но я считаю, что этого слишком мало и недостаточно для приложений, критически важных для данных.
Ниже приведен примерный фрагмент такого производителя
@Autowired
@Qualifier("createKafkaSslProducerOrder")
Producer kafkaSslProducer;
public void publish(String messageKey, String payload, String topic) {
try {
ProducerRecord record = new ProducerRecord<>(topic, messageKey, payload);
kafkaSslProducer.send(record, (metadata, exception) -> {
if (Optional.ofNullable(exception).isPresent()) {
log.error("op={}, status=KO, desc={} and exception={}",
new Object[] { "KafkaProducer",
"Error posting message to kafka topic: " + topic,
exception.getMessage() });
// Send for re-processing
}
});
} catch (Exception ex) {
log.error("op={}, status=KO, desc=Error posting message to SSL kafka: {}, stackTrace={} ", LOG_OP_INFO, ex.getMessage(), ex);
// Re-throw the exception so that status can be recorded in the database.
}
}
В случае сообщений kafka полезно предоставить полную метку времени публикации и оригинальную метку времени изменения сообщения (например, запись в бд). Используя эти временные метки, клиент может определить, является ли входящее сообщение устаревшим или новым обновлением.
Первоначально, во время разработки, очень полезно хранить раздел и смещение потребляемых сообщений. Это может быть сохранено в хранилище потребительских данных или в журналах приложений. Используя эту информацию, сообщение можно напрямую просмотреть в kafka, чтобы увидеть исходное сообщение.
Оригинал: “https://dev.to/madhur/considerations-for-high-throughput-kafka-producer-3gjm”