Рубрики
Без рубрики

Анализ Apache Kafka __потребитель_смещения с помощью команды Kafka и Java API

__consumer смещения-это тема, в которой Apache Kafka хранит смещения. С тех пор, как Кафка перенес хранилище смещений из Zookeeper, чтобы избежать проблем с масштабируемостью, смещения пользователей-это единственная тема…

Автор оригинала: Phani Kumar Yadavilli.

__consumer_offsets – это тема, в которой Apache Kafka хранит смещения. С тех пор как Кафка перенес хранилище смещений из Zookeeper, чтобы избежать проблем с масштабируемостью, смещения __потребителей_ – это та тема, которая занимала центральное место в управлении смещениями для всех потребителей.

По умолчанию потребители не могут использовать тему __consumer_offsets, поскольку это внутренняя тема. Поэтому мы должны включить параметр исключить.внутренние.темы в значение false перед использованием этой темы.

Если мы хотим использовать сценарии командной строки Кафки для использования сообщений, мы можем использовать приведенную ниже команду

bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config consumer.conf --from-beginning
echo "exclude.internal.topics=false" > consumer.config

Мы не можем получить прямой доступ к данным, так как Кафка хранит эту информацию в двоичном формате. Чтобы прочитать эти данные, нам нужно использовать класс форматирования “кафка.координатор.группа. GroupMetadataManager$Смещает MessageFormatter” , который преобразует двоичные данные в удобочитаемый формат. Аналогично, я написал служебный класс, который анализирует данные темы __consumer_offsets, используя API клиента-потребителя Кафки.

byte[] key = consumerRecord.key();
byte[] value;
if(key != null) {
Object o = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key));
if(o!= null && o instanceof OffsetKey) {
      OffsetKey offsetKey = (OffsetKey) o;
      value = consumerRecord.value();
      offsetAndMetadata offsetAndMetadata =          GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value));
}

Вы можете найти полный фрагмент кода, который преобразует метаданные __consumer_offsets в формат JSON, по ссылке ниже на GitHub. Вы можете найти полный фрагмент кода, который преобразует метаданные __consumer_offsets в формат JSON, по ссылке ниже на GitHub.

Оригинал: “https://www.codementor.io/@phanikumaryadavilli/parsing-apache-kafka-__consumer_offsets-using-kafka-command-and-java-api-1kb916n667”