Рубрики
Без рубрики

Обрабатывать обратное давление между Kafka и базой данных с помощью Vert.x

Как мы уже обсуждали в прошлом, асинхронное программирование приносит много плюсов в разработке реактивного… Помеченный как vertx, противодавление, java, асинхронный.

Как мы уже обсуждали в прошлом, асинхронное программирование приносит много преимуществ при разработке реактивных и отзывчивых приложений. Однако это также несет в себе недостатки и проблемы, одной из главных из которых является проблема противодавления.

Что такое противодавление?

В физике

это сопротивление или сила, противодействующая желаемому потоку жидкости по трубам ( википедия )

Мы можем перевести проблему в известный сценарий: сохранение сообщений, извлеченных из шины, где на шине находится огромное количество сообщений, которые наше приложение опрашивает очень быстро, но база данных под ним работает очень медленно.

Как можно избежать переполнения воронки?

В синхронном сценарии проблемы с обратным давлением нет, синхронизирующий характер вычислений блокирует опрос по шине до тех пор, пока не будет обработано текущее сообщение. Но в асинхронном мире опрос выполняется без подсказки о том, что происходит в базе данных. Таким образом, если база данных не сможет обрабатывать все сообщения, поступающие с шины, сообщения останутся “промежуточными”, то есть в памяти нашего сервиса. Это может привести к сбоям или, в худшем случае, к сбою в обслуживании.

Давайте попробуем разработать приложение, которое сохраняет сообщения в базе данных, и заставим его развиваться для обработки противодавления

Автоматический опрос

Сначала наша вертикаль будет выполнять эти операции:

  • инициализируйте клиент JDBC
  • инициализируйте клиент Kafka
  • подписаться на тему
  • сохраняйте записи

Код довольно прост, и он хорошо работает с небольшими объемами сообщений. Когда нагрузка становится все больше и больше, возникает проблема: использование обработчика Vertx Kafka consumer означает, что нет контроля над соотношением сообщений, поэтому он будет непрерывно опрашивать без учета скорости сохранения, вызывая перегрузку памяти.

public class MainVerticle extends AbstractVerticle {

  @Override
  public void start(Promise startPromise) throws Exception {
    JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
    KafkaConsumer
      .create(vertx, kafkaConsumerConfiguration())
      .subscribe("topic.name", startPromise)
      .handler(record -> {
        persist(jdbc, record)
          .onSuccess(result -> System.out.println("Message persisted"))
          .onFailure(cause -> System.err.println("Message not persisted " + cause));
      });
  }

  private Map kafkaConsumerConfiguration() {
    final Map config = new HashMap<>();
    config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
    config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return config;
  }


  private Future persist(JDBCClient jdbc, KafkaConsumerRecord record) {
    Promise promise = Promise.promise();
    JsonArray params = toParams(record);
    jdbc.updateWithParams("insert or update query to persist record", params, promise);
    return promise.future();
  }

  private JsonObject datasourceConfiguration() {
    // TODO datasource configuration
    return null;
  }

  private JsonArray toParams(KafkaConsumerRecord record) {
    // TODO: convert the record into params for the sql command
    return null;
  }
}

Явный опрос

Для обработки противодавления должен использоваться явный опрос, и это можно сделать, избегая настройки обработчика потребителя kafka и вызывая poll вручную (в следующем случае каждые 100 мс). Используя этот подход, можно сделать так, чтобы каждый опрос выполнялся только тогда, когда сохраняется пакет ранее опрошенных сообщений. Такое поведение может быть достигнуто путем обработки каждого сообщения persist future и сбора их всех с помощью CompositeFuture.all , который будет успешным только тогда, когда все сообщения будут завершены, и только в этом случае может быть выполнен следующий опрос. Если хотя бы одно из будущих завершится неудачей, все завершится неудачей, и опрос прекратится. Существуют различные решения, которые могут быть адаптированы для того, чтобы служба справилась со сбоем, например, отправка сообщения в Очередь мертвых писем , но мы не будем рассматривать этот случай.

Проблема с этим кодом заключается в том, что в случае сбоя сообщения мы его потеряем, потому что потребитель настроен на автоматическую фиксацию, поэтому именно vertx фиксирует смещение темы.

public class MainVerticle extends AbstractVerticle {

  @Override
  public void start(Promise startPromise) throws Exception {
    JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
    KafkaConsumer consumer = KafkaConsumer
      .create(vertx, kafkaConsumerConfiguration())
      .subscribe("topic.name", startPromise);

    poll(jdbc, consumer);
  }

  private void poll(JDBCClient jdbc, KafkaConsumer consumer) {
    Promise> pollPromise = Promise.promise();
    consumer.poll(100, pollPromise);

    pollPromise.future()
      .compose(records -> {
        List> futures = IntStream.range(0, records.size())
          .mapToObj(records::recordAt)
          .map(record -> persist(jdbc, record))
          .collect(toList());

        return CompositeFuture.all(new ArrayList<>(futures));
      })
      .onSuccess(composite -> {
        System.out.println("All messages persisted");
        poll(jdbc, consumer);
      })
      .onFailure(cause -> System.err.println("Error persisting messages: " + cause))
    ;
  }

  private Map kafkaConsumerConfiguration() {
    final Map config = new HashMap<>();
    config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
    config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return config;
  }

  ...
}

Ручная фиксация

Установив для свойств ENABLE_AUTO_COMMIT_CONFIG значение false , служба становится владельцем фиксации смещения темы. Фиксация будет выполнена только тогда, когда каждое сообщение будет сохранено, с помощью этого трюка по крайней мере один раз доставка достигнута.

public class MainVerticle extends AbstractVerticle {

  @Override
  public void start(Promise startPromise) throws Exception {
    JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
    KafkaConsumer consumer = KafkaConsumer
      .create(vertx, kafkaConsumerConfiguration())
      .subscribe("topic.name", startPromise);

    poll(jdbc, consumer);
  }

  private void poll(JDBCClient jdbc, KafkaConsumer consumer) {
    Promise> pollPromise = Promise.promise();
    consumer.poll(100, pollPromise);

    pollPromise.future()
      .compose(records -> {
        List> futures = IntStream.range(0, records.size())
          .mapToObj(records::recordAt)
          .map(record -> persist(jdbc, record))
          .collect(toList());

        return CompositeFuture.all(new ArrayList<>(futures));
      })
      .compose(composite -> {
        Promise commitPromise = Promise.promise();
        consumer.commit(commitPromise);
        return commitPromise.future();
      })
      .onSuccess(any -> {
        System.out.println("All messages persisted and committed");
        poll(jdbc, consumer);
      })
      .onFailure(cause -> System.err.println("Error persisting and committing messages: " + cause))
    ;
  }

  private Map kafkaConsumerConfiguration() {
    final Map config = new HashMap<>();
    config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
    config.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
    config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return config;
  }
  ...
}

Бонусная функция: достижение порядка

Приложив немного усилий, можно добиться упорядочения: будущая композиция позволяет заставить каждую операцию сохранения ждать завершения своего прецедента. Это достижимо путем привязки асинхронных вычислений друг к другу, так что все они будут выполнены, когда прецедентное будущее завершится успешно. Это умный шаблон, который следует использовать, когда требуется сериализация .

public class MainVerticle extends AbstractVerticle {
  ...
  private void poll(JDBCClient jdbc, KafkaConsumer consumer) {
    Promise> pollPromise = Promise.promise();
    consumer.poll(100, pollPromise);

    pollPromise.future()
      .compose(records -> IntStream.range(0, records.size())
        .mapToObj(records::recordAt)
        .reduce(Future.succeededFuture(),
          (acc, record) -> acc.compose(it -> persist(jdbc, record)),
          (a,b) -> a
        )
      )
      .compose(composite -> {
        Promise commitPromise = Promise.promise();
        consumer.commit(commitPromise);
        return commitPromise.future();
      })
      .onSuccess(any -> {
        System.out.println("All messages persisted and committed");
        poll(jdbc, consumer);
      })
      .onFailure(cause -> System.err.println("Error persisting and committing messages: " + cause));
  }
  ...
}

Вывод

Противодавление – это фундаментальная тема, которую необходимо охватить при работе с асинхронным программированием. Это не предоставляется бесплатно из vert.xbox, но этого можно достичь с помощью некоторых простых трюков.

Оригинал: “https://dev.to/cherrychain/handle-backpressure-between-kafka-and-a-database-with-vert-x-5a3j”