Рубрики
Без рубрики

Como implementar Тема с мертвой буквой com Весна Кафка

Dead-letter Topic, Dead-letter Queue ” или ” в старые добрые португальский: Темы, сообщения не доставляются…. С тегом кафка, кафка, ява, весна.

Тема с мертвыми письмами , Очередь с мертвыми письмами или в старый добрый португальский: Темы сообщения не доставляются . Являются темы, необходимые в распределенных системах, где связь является асинхронным, так и через брокеров, таких как Кафка.

Данные, которые поступают на эти темы прошли через все попытки возможных для обработки ошибок, и уже не остается ничего сделано, если бы не вмешательство человека. Assim, nao серия qualquer gato que llegara mensagens ou eventos сыворотка для публикадо, ум топико мертвая буква .

В мире Кафки тему dead-letter предназначена для записей, уничтожены, что по какой-то неустранимой ошибке и не могли быть успешно обработаны.

Не артиго сера демонстративно ума отменяет пункт реализации DLT без Кафки, использует Java spring. Для нетерпеливых 🧐 , эти источники:

фабио хосе/весна-кафка-dlt-бывший

Тема с мертвой буквой com Весна Кафка

Ошибки классификации

Перед тем, как написать какую-либо строку кода необходимо классифицировать ошибки и как будут лечить.

Существует два типа ошибок:

  • восстанавливать
  • не-восстанавливаемых

Ошибки восстанавливаемых или, которые могут быть обработаны, являются те, где-то подход будет горничная, чтобы попытаться завершить поток успешно.

Например, в Java, ошибки ConnectException и UnknownHostException могут быть переработаны с помощью retentativas, потому что, как правило, являются причиной неустойчивости однократно в сервис или в сети.

Уже не-восстанавливаемых те, что независимо от того, что было сделано, будет невозможно завершить поток успешно. Вскоре, эти кандидаты следовать непосредственно к теме dead-letter. Экомо пример, се вокет, Авро, о эро Автоматическое исключение пропущенного поля указывает на отсутствие поля, необходимого, не имеющие ничего общего. Поэтому будет недоступен для повторной попытки, например.

И помимо технических ошибок, вы также должны сортировать свои ошибки бизнеса и выбрать стратегию, чтобы лечить их.

Хорошо, теперь мы рассмотрим, как реализовать DLT за технических проблем на Java. И вполне вероятно, что вы встретите эквиваленты на его языке или framework.

Восстанавливать

Ниже приведен список технические ошибки, извлекаемыми, что, во имя простоты, будут обработаны через retentativas.

  • ConnectException
  • UnknownHostException
  • ProductNotFoundException : к примеру, эта ошибка бизнес был определен как лечить. Поскольку услуга называется Каталог , что делает event-sourcing изменений, выданных службой Продукции , еще не обработал событие товара. Но через повторной попытки его поток можно завершить успешно.

Не-восстанавливаемых

Вот некоторые из ошибок, не обратимы, то есть, если, проходящих через один и тот же процесс retentativas только consumiriam ресурсов, без шансов завершить успешно.

Выполнение

Имеется хорошая статья в блог инженеров Uber, которая описывает архитектуру для dead-letter. Стоит читать!

Эта реализация была разделена на три части:

  • топика
  • строительство
  • лечение

Топика

Для каждого процесс, который придется лечить dead-letter будет создан раздел с суффиксом -dlt . Возьмем в качестве примера процесс, который осуществляет бронирование, акции от заказов, опубликованные в теме порядок покупки . Поэтому, вместо того, чтобы создавать темы порядке-куплю-dlq, используется имя соответствующего процесса, происходит сбой,:

  • заказать шток-dlt

Электронная почта os tópicos para retentativas, вопрос о необходимости проведения форума quantos на заключительном этапе регистрации в чегаре ао топико мертвое письмо . Скажем, что будет максимум четыре retentativas кроме того, начальный:

  • заказать шток-retry-1
  • заказать шток-retry-2
  • заказать шток-retry-3
  • заказать шток-retry-4

Разделы должны иметь характеристики, которые не помешают в ходе retentativas или в публикации, на тему dead-letter. Пример конфигурации Пример конфигурации , que pode acarretar um ошибка шамадо сообщение слишком большое

Строительство

Здание было сделано с Spring Kafka, потому что он уже поставляется с множеством утилит для лечения dead-letter , что очень помогает производительности, является ли она сосредоточиться на своем проекте. Но ничто не мешает вам написать одно и то же решение с Клиентами Кафка на Java или на ваш язык вашего проекта.

Чтобы использовать ресурсы, предназначенные для dead-letter, необходимо настроить фабрику объектов поручено производить listeners Кафка без весны.

Таким образом, конфигурация программного был под-разделить на три части:

  • решить отвечает за определение целевой раздел реестра, который находится в контексте ошибки.
  • errorHandler : обработчик ошибки
  • kafkaListernerContainerFactory : производит экземпляров, которые используются в методах, аннотированный с @KafkaListener

И для каждого из подразделений имеется реализации Main и другие Retry . Как можно себе представить, является уход за параметров для основной обработки, другой для retentativas.

Main решения , ответственность за определение того, какой раздел назначения, основанные на ошибки-корень, выпустила на регистрацию употреблять в теме заказ-на покупку :

@Bean
public BiFunction < ConsumerRecord < ? , ? > , Exception, TopicPartition >
 mainResolver() {

  return new BiFunction < ConsumerRecord < ? , ? > , Exception, TopicPartition > () {
   @Override
   public TopicPartition apply(ConsumerRecord < ? , ? > r, Exception e) {

    // ####
    // Por padrão, quando é não-recuperável, segue diretamente p/ dead-letter
    TopicPartition result =
     new TopicPartition(dltTopic,
      QUALQUER_PARTICAO);

    // ####
    // Trata-se de um erro recuperável?
    final boolean recuperavel = isRecuperavel(e);
    if (recuperavel) {

     Optional < String > origem =
      topicoOrigem(r.headers())
      .or(() -> Optional.of(NENHUM_CABECALHO));

     // ####
     // Se origem for outro tópico, segue para o primeiro retry
     String destino =
      origem
      .filter(topico -> !topico.matches(retryTopicsPattern))
      .map(t -> retryFirstTopic)
      .orElse(dltTopic);

     result = new TopicPartition(destino, QUALQUER_PARTICAO);
    }

    return result;
   }
  };
 }

Main error handler , который использует Главный преобразователь и отвечает за два параметра, необходимых для обработки ошибок:

  • DeadLetterPublishingRecoverer : запуск потока DLT, который является делегатом по SeekToCurrentErrorHandler если retentativas местах, не устранило ошибку.
  • SeekToCurrentErrorHandler : обрабатывает любые ошибки, будет запущен метод, аннотированный с @KafkaListener . Конечно, если вы захватить их с catch и не позволяет им перерастут в стек , нельзя относиться к ним.
@Bean
public SeekToCurrentErrorHandler mainErrorHandler(
 @Qualifier("mainResolver")
 BiFunction < ConsumerRecord < ? , ? > , Exception, TopicPartition > resolver,
 KafkaTemplate < ? , ? > template) {

 // ####
 // Recuperação usando dead-letter 
 DeadLetterPublishingRecoverer recoverer =
  new DeadLetterPublishingRecoverer(template, resolver);

 // ####
 // Tentar 3x localmente antes de iniciar o fluxo dead-letter
 SeekToCurrentErrorHandler handler =
  new SeekToCurrentErrorHandler(recoverer, RETENTAR_3X);

 // ####
 // Lista das exceções não-recuperáveis, para evitar o retry local
 excecoes.getNaoRecuperavies().forEach(e ->
  handler.addNotRetryableException(e));

 return handler;
}

Main listener factory , поручено изготовить потребителей, которые обрабатывают записи tópíco порядке,-покупки .

@Bean
public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, GenericRecord >>
 mainKafkaListenerContainerFactory(
  @Qualifier("mainErrorHandler") SeekToCurrentErrorHandler errorHandler,
  KafkaProperties properties,
  ConsumerFactory < String, GenericRecord > factory) {

  ConcurrentKafkaListenerContainerFactory < String, GenericRecord > listener =
   new ConcurrentKafkaListenerContainerFactory < > ();

  listener.setConsumerFactory(factory);

  // ####
  // Utilizando o mainErrorHandler para tratar os erros
  listener.setErrorHandler(errorHandler);

  // Falhar, caso os tópicos não existam?
  listener.getContainerProperties()
   .setMissingTopicsFatal(missingTopicsFatal);

  // Commit do offset no registro, logo após processá-lo no listener
  listener.getContainerProperties().setAckMode(AckMode.MANUAL);

  // Commits síncronos
  listener.getContainerProperties().setSyncCommits(Boolean.TRUE);

  return listener;
 }

Так что просто аннотировать метод, как показано ниже.

Но стоит отметить, что смещение всегда должна быть подтверждена, так как весь поток, который обрабатывает ошибки будет сделано повторной попытки и, возможно, кульминацией в разделе dead-letter. Проанализируйте свой случай-польза и поймите, если такой подход также применяется.

@KafkaListener(
 id = "main-kafka-listener",
 topics = "${app.kafka.consumer.topics}",
 containerFactory = "mainKafkaListenerContainerFactory"
)
public void consume(@Payload ConsumerRecord < String, GenericRecord > record,
 Acknowledgment ack) throws Exception {

 try {

  // #### 
  // Processar
  process(record);

 } finally {

  // ####
  // Sempre confirmar o offset
  ack.acknowledge();
 }

}

Уже в конфигурации для обработки retentativas ниже форм подобных, с некоторыми исключениями:

  • устранить возвращает которой следующая тема в последовательности retentativas или, если это забронировать-акции-dlt , если у них уже пройденные все.
  • errorHandler : нет цепкой месте.

Это метод, аннотированный с @KafkaListener что будет рассматривать все темы забронировать-акции-retry :

@KafkaListener(
 id = "retry-kafka-listener",
 topicPattern = "${app.kafka.dlt.retry.topics.pattern}",
 containerFactory = "retryKafkaListenerContainerFactory",
 properties = {
  "fetch.min.bytes=${app.kafka.dlt.retry.min.bytes}",
  "fetch.max.wait.ms=${app.kafka.dlt.retry.max.wait.ms}"
 }
)
public void retry(@Payload ConsumerRecord < String, GenericRecord > record,
 Acknowledgment ack) throws Exception {

 try {

  // ####
  // Reprocessar
  process(record);

 } finally {
  // ####
  // Sempre confirmar o offset
  ack.acknowledge();
 }

}
  • topicPattern : потреблять все темы забронировать-акции-retry
  • выборка.мин.байт e fetch.max.wait.ms используются, чтобы вызвать некоторую задержку в потреблении из записи, видно, что без них потребление будет практически мгновенный.

Наконец, application.properties да будет так:

app.kafka.dlt.retry.topics=4
app.kafka.dlt.retry.topics.pattern=reservar-estoque-retry-[0-9]+
app.kafka.dlt.retry.topic.first=reservar-estoque-retry-1
app.kafka.dlt.topic=reservar-estoque-dlt

# Lista de exceções recuperáveis
app.kafka.dlt.excecoes.recuperaveis[0]=java.net.ConnectException
app.kafka.dlt.excecoes.recuperaveis[1]=java.net.UnknownHostException

# Lista de exceções não-recuperáveis
app.kafka.dlt.excecoes.naoRecuperaveis[0]=org.apache.avro.AvroMissingFieldException
app.kafka.dlt.excecoes.naoRecuperaveis[1]=java.lang.NullPointerException

# Provocar atraso no processmento de retentativa
app.kafka.dlt.retry.max.wait.ms=20000
app.kafka.dlt.retry.min.bytes=52428800

Всего реализация доступен на Github, клонируйте его и получайте удовольствие!

фабио хосе/весна-кафка-dlt-бывший

Тема с мертвой буквой com Весна Кафка

Dead-letter Бы с Spring Кафка и формат данных Avro.

Заявление

  • JDK 1.8
  • Apache Maven 3.6+
  • Докер 19+
  • Доступ к хранилищу Доступ к хранилищу или альтернативы с бесплатным зависимостей, имеющихся в или альтернативы с бесплатным зависимостей, имеющихся в
  • Реестр схем
  • Кафка
  • Тестовые контейнеры
  • Ломбок

Конфигурация

Не волнуйтесь, потому что несмотря на то, что есть и ярлыки по variávies среды, вы можете спокойно использовать то, что Spring Boot предлагает. Тогда смотрите все свойства Тогда смотрите все свойства

В случае, если Кафка, используем Spring, Kafka, то можно использовать режим Spring для настройки.

Переменные Среды

  • APP_KAFKA_CONSUMER_TOPICS : разделы, чтобы потреблять, или выражение.

  • КАФКА_КЛИЕНТ_ИД : : имя клиента Кафки, usado pelos брокеры в лос-э-метрикас. Использование имя clean

    • , а не общий. spring.кафка.производитель.идентификатор клиента ,
  • spring.кафка.потребитель.идентификатор клиента KAFKA_CONSUMER_GROUP

    • : имя группы потребления, что это приложение принадлежит
  • KAFKA_CONSUMER_GROUP : имя группы потребления, что это приложение принадлежит

    • KAFKA_BOOTSTRAP_SERVERS
  • : список брокеров для кластера Kafka кластера Kafka

    • SCHEMA_REGISTRY_URL
  • : url-адрес для записи схемы Avro

Когда запись достигает последней темы от повторной попытки, в этом случае забронировать-акции-4 и не будет обработан успешно, и, наконец, он будет опубликован в разделе dead-letter , то команда должна быть готова для лечения.

Лечение

Ну, на данный момент запись с проблемами уже приземлился в разделе забронировать-акции-dlt и еще до того, как произойдет точная система для мониторинга темы, должны быть предупреждены об использовании разделов для retentativas, особенно если записи достигают в забронировать-акции-retry-4 .

См. в этой статье как контролировать свое кластера Kafka.

Способ более разумного лечения, а также мониторинг и современная система распределенной трассировки, будет создавать процесс, в котором для каждой записи, опубликованной в DLT, квитанций и уведомлений, ChatOps должны были быть отправлены, чтобы персонал, ответственный.

Также необходимы соответствующие инструменты, чтобы, например, редактировать записи и положил их обратно в первоначальной теме.

Ну, это он! До следующего.

Оригинал: “https://dev.to/kafkabr/como-implementar-dead-letter-topic-com-spring-kafka-585e”