Рубрики
Без рубрики

Соображения для производителя кафки с высокой производительностью

Соображения для производителя кафки с высокой пропускной способностью. Помеченный как java, кафка.

Недавно я работал над множеством высокопроизводительных производителей кафки. Наше приложение публикует около 3 миллионов публикаций кафки в день. (что все еще мало по сравнению с тем, с чем может справиться кафка)

На этом пути есть некоторые уроки по поддержанию таких производителей кафки:

  • Выбирайте количество разделов с умом: количество разделов определяет, насколько потребители могут масштабироваться. Количество разделов – это степень параллелизма в кафке. Кафка передает данные одного раздела одному потоку.

Наше общее правило состоит в том, чтобы иметь разделы, равные количеству серверов-потребителей. Например, если у нас есть кластер из 20 серверов, использующих тему кафки, каждый сервер будет потреблять из одного раздела, поэтому 20 разделов.

Есть много других факторов, которые необходимо учитывать как объяснено здесь

  • Выберите согласованный ключ при публикации – Сообщения, опубликованные с одним и тем же ключом, будут опубликованы в одном разделе. Раздел – это логическая единица упорядочения сообщений. Поэтому, если для вас важен порядок сообщений, вам следует выбрать согласованный ключ для этих сообщений.

  • Используйте силу асинхронности – производитель Кафки по умолчанию асинхронен, если вы явно не используете блокирующий вызов. Это означает, что публикация кафки может завершиться неудачей, и ваш код уже прошел бы мимо метода публикации. Производитель Кафки обеспечивает обратный вызов после того, как сервер выполнил инструкцию публикации. В этом обратном вызове пользователь может проверить наличие сбоя и повторить попытку или отправить в очередь мертвых писем и т.д. Сам производитель Кафки повторяет попытку 3 раза, но я считаю, что этого слишком мало и недостаточно для приложений, критически важных для данных.

Ниже приведен примерный фрагмент такого производителя

@Autowired
@Qualifier("createKafkaSslProducerOrder")
Producer kafkaSslProducer;

public void publish(String messageKey, String payload, String topic) {

    try {

        ProducerRecord record = new ProducerRecord<>(topic, messageKey, payload);

        kafkaSslProducer.send(record, (metadata, exception) -> {

            if (Optional.ofNullable(exception).isPresent()) {
                log.error("op={}, status=KO, desc={} and exception={}",
                        new Object[] { "KafkaProducer",
                                "Error posting message to kafka topic: " + topic,
                                exception.getMessage() });
                // Send for re-processing
            }

        });

    } catch (Exception ex) {
        log.error("op={}, status=KO, desc=Error posting message to SSL kafka: {}, stackTrace={} ", LOG_OP_INFO, ex.getMessage(), ex);
        // Re-throw the exception so that status can be recorded in the database.
    }
}
  • В случае сообщений kafka полезно предоставить полную метку времени публикации и оригинальную метку времени изменения сообщения (например, запись в бд). Используя эти временные метки, клиент может определить, является ли входящее сообщение устаревшим или новым обновлением.

  • Первоначально, во время разработки, очень полезно хранить раздел и смещение потребляемых сообщений. Это может быть сохранено в хранилище потребительских данных или в журналах приложений. Используя эту информацию, сообщение можно напрямую просмотреть в kafka, чтобы увидеть исходное сообщение.

Оригинал: “https://dev.to/madhur/considerations-for-high-throughput-kafka-producer-3gjm”