Тема с мертвыми письмами , Очередь с мертвыми письмами или в старый добрый португальский: Темы сообщения не доставляются . Являются темы, необходимые в распределенных системах, где связь является асинхронным, так и через брокеров, таких как Кафка.
Данные, которые поступают на эти темы прошли через все попытки возможных для обработки ошибок, и уже не остается ничего сделано, если бы не вмешательство человека. Assim, nao серия qualquer gato que llegara mensagens ou eventos сыворотка для публикадо, ум топико мертвая буква .
В мире Кафки тему dead-letter предназначена для записей, уничтожены, что по какой-то неустранимой ошибке и не могли быть успешно обработаны.
Не артиго сера демонстративно ума отменяет пункт реализации DLT без Кафки, использует Java spring. Для нетерпеливых 🧐 , эти источники:
фабио хосе/весна-кафка-dlt-бывший
Тема с мертвой буквой com Весна Кафка
Ошибки классификации
Перед тем, как написать какую-либо строку кода необходимо классифицировать ошибки и как будут лечить.
Существует два типа ошибок:
- восстанавливать
- не-восстанавливаемых
Ошибки восстанавливаемых или, которые могут быть обработаны, являются те, где-то подход будет горничная, чтобы попытаться завершить поток успешно.
Например, в Java, ошибки ConnectException
и UnknownHostException
могут быть переработаны с помощью retentativas, потому что, как правило, являются причиной неустойчивости однократно в сервис или в сети.
Уже не-восстанавливаемых те, что независимо от того, что было сделано, будет невозможно завершить поток успешно. Вскоре, эти кандидаты следовать непосредственно к теме dead-letter. Экомо пример, се вокет, Авро, о эро Автоматическое исключение пропущенного поля
указывает на отсутствие поля, необходимого, не имеющие ничего общего. Поэтому будет недоступен для повторной попытки, например.
И помимо технических ошибок, вы также должны сортировать свои ошибки бизнеса и выбрать стратегию, чтобы лечить их.
Хорошо, теперь мы рассмотрим, как реализовать DLT за технических проблем на Java. И вполне вероятно, что вы встретите эквиваленты на его языке или framework.
Восстанавливать
Ниже приведен список технические ошибки, извлекаемыми, что, во имя простоты, будут обработаны через retentativas.
ConnectException
UnknownHostException
ProductNotFoundException
: к примеру, эта ошибка бизнес был определен как лечить. Поскольку услуга называется Каталог , что делает event-sourcing изменений, выданных службой Продукции , еще не обработал событие товара. Но через повторной попытки его поток можно завершить успешно.
Не-восстанавливаемых
Вот некоторые из ошибок, не обратимы, то есть, если, проходящих через один и тот же процесс retentativas только consumiriam ресурсов, без шансов завершить успешно.
AvroMissingFieldException
NullPointerException
ClassCastException
RecordTooLargeException
StockNotAvailableException
:, как, например, этот ошибке бизнес был оценен как неустранимые. Потому что отсутствие продуктов на складе не будет решен с retentativas, например.
Выполнение
Имеется хорошая статья в блог инженеров Uber, которая описывает архитектуру для dead-letter. Стоит читать!
Эта реализация была разделена на три части:
- топика
- строительство
- лечение
Топика
Для каждого процесс, который придется лечить dead-letter будет создан раздел с суффиксом -dlt
. Возьмем в качестве примера процесс, который осуществляет бронирование, акции от заказов, опубликованные в теме порядок покупки
. Поэтому, вместо того, чтобы создавать темы порядке-куплю-dlq, используется имя соответствующего процесса, происходит сбой,:
заказать шток-dlt
Электронная почта os tópicos para retentativas, вопрос о необходимости проведения форума quantos на заключительном этапе регистрации в чегаре ао топико мертвое письмо . Скажем, что будет максимум четыре retentativas кроме того, начальный:
заказать шток-retry-1
заказать шток-retry-2
заказать шток-retry-3
заказать шток-retry-4
Разделы должны иметь характеристики, которые не помешают в ходе retentativas или в публикации, на тему dead-letter. Пример конфигурации Пример конфигурации
, que pode acarretar um ошибка шамадо
сообщение слишком большое
Строительство
Здание было сделано с Spring Kafka, потому что он уже поставляется с множеством утилит для лечения dead-letter , что очень помогает производительности, является ли она сосредоточиться на своем проекте. Но ничто не мешает вам написать одно и то же решение с Клиентами Кафка на Java или на ваш язык вашего проекта.
Чтобы использовать ресурсы, предназначенные для dead-letter, необходимо настроить фабрику объектов поручено производить listeners Кафка без весны.
Таким образом, конфигурация программного был под-разделить на три части:
решить
отвечает за определение целевой раздел реестра, который находится в контексте ошибки.errorHandler
: обработчик ошибкиkafkaListernerContainerFactory
: производит экземпляров, которые используются в методах, аннотированный с@KafkaListener
И для каждого из подразделений имеется реализации Main и другие Retry . Как можно себе представить, является уход за параметров для основной обработки, другой для retentativas.
Main решения , ответственность за определение того, какой раздел назначения, основанные на ошибки-корень, выпустила на регистрацию употреблять в теме заказ-на покупку
:
@Bean public BiFunction < ConsumerRecord < ? , ? > , Exception, TopicPartition > mainResolver() { return new BiFunction < ConsumerRecord < ? , ? > , Exception, TopicPartition > () { @Override public TopicPartition apply(ConsumerRecord < ? , ? > r, Exception e) { // #### // Por padrão, quando é não-recuperável, segue diretamente p/ dead-letter TopicPartition result = new TopicPartition(dltTopic, QUALQUER_PARTICAO); // #### // Trata-se de um erro recuperável? final boolean recuperavel = isRecuperavel(e); if (recuperavel) { Optional < String > origem = topicoOrigem(r.headers()) .or(() -> Optional.of(NENHUM_CABECALHO)); // #### // Se origem for outro tópico, segue para o primeiro retry String destino = origem .filter(topico -> !topico.matches(retryTopicsPattern)) .map(t -> retryFirstTopic) .orElse(dltTopic); result = new TopicPartition(destino, QUALQUER_PARTICAO); } return result; } }; }
Main error handler , который использует Главный преобразователь и отвечает за два параметра, необходимых для обработки ошибок:
DeadLetterPublishingRecoverer
: запуск потока DLT, который является делегатом по SeekToCurrentErrorHandler если retentativas местах, не устранило ошибку.SeekToCurrentErrorHandler
: обрабатывает любые ошибки, будет запущен метод, аннотированный с@KafkaListener
. Конечно, если вы захватить их сcatch
и не позволяет им перерастут в стек , нельзя относиться к ним.
@Bean public SeekToCurrentErrorHandler mainErrorHandler( @Qualifier("mainResolver") BiFunction < ConsumerRecord < ? , ? > , Exception, TopicPartition > resolver, KafkaTemplate < ? , ? > template) { // #### // Recuperação usando dead-letter DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template, resolver); // #### // Tentar 3x localmente antes de iniciar o fluxo dead-letter SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler(recoverer, RETENTAR_3X); // #### // Lista das exceções não-recuperáveis, para evitar o retry local excecoes.getNaoRecuperavies().forEach(e -> handler.addNotRetryableException(e)); return handler; }
Main listener factory , поручено изготовить потребителей, которые обрабатывают записи tópíco порядке,-покупки
.
@Bean public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String, GenericRecord >> mainKafkaListenerContainerFactory( @Qualifier("mainErrorHandler") SeekToCurrentErrorHandler errorHandler, KafkaProperties properties, ConsumerFactory < String, GenericRecord > factory) { ConcurrentKafkaListenerContainerFactory < String, GenericRecord > listener = new ConcurrentKafkaListenerContainerFactory < > (); listener.setConsumerFactory(factory); // #### // Utilizando o mainErrorHandler para tratar os erros listener.setErrorHandler(errorHandler); // Falhar, caso os tópicos não existam? listener.getContainerProperties() .setMissingTopicsFatal(missingTopicsFatal); // Commit do offset no registro, logo após processá-lo no listener listener.getContainerProperties().setAckMode(AckMode.MANUAL); // Commits síncronos listener.getContainerProperties().setSyncCommits(Boolean.TRUE); return listener; }
Так что просто аннотировать метод, как показано ниже.
Но стоит отметить, что смещение всегда должна быть подтверждена, так как весь поток, который обрабатывает ошибки будет сделано повторной попытки и, возможно, кульминацией в разделе dead-letter. Проанализируйте свой случай-польза и поймите, если такой подход также применяется.
@KafkaListener( id = "main-kafka-listener", topics = "${app.kafka.consumer.topics}", containerFactory = "mainKafkaListenerContainerFactory" ) public void consume(@Payload ConsumerRecord < String, GenericRecord > record, Acknowledgment ack) throws Exception { try { // #### // Processar process(record); } finally { // #### // Sempre confirmar o offset ack.acknowledge(); } }
Уже в конфигурации для обработки retentativas ниже форм подобных, с некоторыми исключениями:
устранить
возвращает которой следующая тема в последовательности retentativas или, если этозабронировать-акции-dlt
, если у них уже пройденные все.errorHandler
: нет цепкой месте.
Это метод, аннотированный с @KafkaListener
что будет рассматривать все темы забронировать-акции-retry
:
@KafkaListener( id = "retry-kafka-listener", topicPattern = "${app.kafka.dlt.retry.topics.pattern}", containerFactory = "retryKafkaListenerContainerFactory", properties = { "fetch.min.bytes=${app.kafka.dlt.retry.min.bytes}", "fetch.max.wait.ms=${app.kafka.dlt.retry.max.wait.ms}" } ) public void retry(@Payload ConsumerRecord < String, GenericRecord > record, Acknowledgment ack) throws Exception { try { // #### // Reprocessar process(record); } finally { // #### // Sempre confirmar o offset ack.acknowledge(); } }
topicPattern
: потреблять все темызабронировать-акции-retry
выборка.мин.байт
efetch.max.wait.ms
используются, чтобы вызвать некоторую задержку в потреблении из записи, видно, что без них потребление будет практически мгновенный.
Наконец, application.properties
да будет так:
app.kafka.dlt.retry.topics=4 app.kafka.dlt.retry.topics.pattern=reservar-estoque-retry-[0-9]+ app.kafka.dlt.retry.topic.first=reservar-estoque-retry-1 app.kafka.dlt.topic=reservar-estoque-dlt # Lista de exceções recuperáveis app.kafka.dlt.excecoes.recuperaveis[0]=java.net.ConnectException app.kafka.dlt.excecoes.recuperaveis[1]=java.net.UnknownHostException # Lista de exceções não-recuperáveis app.kafka.dlt.excecoes.naoRecuperaveis[0]=org.apache.avro.AvroMissingFieldException app.kafka.dlt.excecoes.naoRecuperaveis[1]=java.lang.NullPointerException # Provocar atraso no processmento de retentativa app.kafka.dlt.retry.max.wait.ms=20000 app.kafka.dlt.retry.min.bytes=52428800
Всего реализация доступен на Github, клонируйте его и получайте удовольствие!
фабио хосе/весна-кафка-dlt-бывший
Тема с мертвой буквой com Весна Кафка
Dead-letter Бы с Spring Кафка и формат данных Avro.
Заявление
- JDK 1.8
- Apache Maven 3.6+
- Докер 19+
- Доступ к хранилищу Доступ к хранилищу или альтернативы с бесплатным зависимостей, имеющихся в
или альтернативы с бесплатным зависимостей, имеющихся в
- Реестр схем
- Кафка
- Тестовые контейнеры
- Ломбок
Конфигурация
Не волнуйтесь, потому что несмотря на то, что есть и ярлыки по variávies среды, вы можете спокойно использовать то, что Spring Boot предлагает. Тогда смотрите все свойства Тогда смотрите все свойства
В случае, если Кафка, используем Spring, Kafka, то можно использовать режим Spring для настройки.
Переменные Среды
APP_KAFKA_CONSUMER_TOPICS
: разделы, чтобы потреблять, или выражение.КАФКА_КЛИЕНТ_ИД
: : имя клиента Кафки, usado pelos брокеры в лос-э-метрикас. Использование имя clean, а не общий.
spring.кафка.производитель.идентификатор клиента,
spring.кафка.потребитель.идентификатор клиента
KAFKA_CONSUMER_GROUP: имя группы потребления, что это приложение принадлежит
KAFKA_CONSUMER_GROUP
: имя группы потребления, что это приложение принадлежитKAFKA_BOOTSTRAP_SERVERS
: список брокеров для кластера Kafka
кластера KafkaSCHEMA_REGISTRY_URL
: url-адрес для записи схемы Avro
Когда запись достигает последней темы от повторной попытки, в этом случае забронировать-акции-4
и не будет обработан успешно, и, наконец, он будет опубликован в разделе dead-letter , то команда должна быть готова для лечения.
Лечение
Ну, на данный момент запись с проблемами уже приземлился в разделе забронировать-акции-dlt
и еще до того, как произойдет точная система для мониторинга темы, должны быть предупреждены об использовании разделов для retentativas, особенно если записи достигают в забронировать-акции-retry-4
.
См. в этой статье как контролировать свое кластера Kafka.
Способ более разумного лечения, а также мониторинг и современная система распределенной трассировки, будет создавать процесс, в котором для каждой записи, опубликованной в DLT, квитанций и уведомлений, ChatOps должны были быть отправлены, чтобы персонал, ответственный.
Также необходимы соответствующие инструменты, чтобы, например, редактировать записи и положил их обратно в первоначальной теме.
Ну, это он! До следующего.
Оригинал: “https://dev.to/kafkabr/como-implementar-dead-letter-topic-com-spring-kafka-585e”