Рубрики
Без рубрики

Играю С Apache Storm В Docker – Как Босс

Эта статья не является окончательным руководством по Шторму и не предназначена для этого. Шторм довольно сильный и справедливый… Помеченный как storm, java, docker, data.

Эта статья не является окончательным руководством по Storm и не предназначена для этого. Шторм довольно огромен, и всего одно длинное чтение, вероятно, в любом случае не сможет оценить его должным образом. Конечно, любые дополнения, отзывы или конструктивная критика будут высоко оценены. Хорошо, теперь, когда с этим покончено, давайте посмотрим, что мы будем освещать:

  • Необходимость шторма, его “почему”, что это такое и чем это не является
  • Взгляд с высоты птичьего полета на то, как это работает.
  • Как примерно выглядит топология Storm в коде (Java)
  • Настройка и работа с производственным кластером Storm в Docker.
  • Несколько слов о надежности обработки сообщений.

Я также предполагаю, что вы, по крайней мере, немного знакомы с Docker и контейнеризацией.

Непрерывные потоки данных распространены повсеместно и становятся еще более распространенными с увеличением числа используемых устройств Интернета вещей . Конечно, эти данные хранятся, обрабатываются и анализируются для получения прогнозируемых, действенных результатов. Но анализ петабайт занимает много времени, даже с помощью Hadoop (как бы хорош ни был MapReduce) или Spark (средство устранения ограничений MapReduce). Во-вторых, очень часто нам не нужно выводить закономерности в течение длительных периодов времени. Из петабайт поступающих данных, собранных за месяцы, в любой данный момент , нам, возможно, не потребуется учитывать все это, просто снимок в реальном времени. Возможно, нам не нужно знать самый длинный трендовый хэштег за последние пять лет, а только один прямо сейчас. Это то, для чего создан Storm, чтобы принимать тонны данных, поступающих чрезвычайно быстро, возможно, из разных источников, анализировать их и публиковать обновления в реальном времени в пользовательском интерфейсе или в каком-либо другом месте без сохранения каких-либо данных .

как это работает

Архитектуру Storm можно сравнить с сетью дорог, соединяющих множество контрольно-пропускных пунктов. Трафик начинается на определенной контрольной точке (называемой spout ) и проходит через другие контрольные точки (называемые bolts ). Трафик – это, конечно, поток данных, который извлекается с помощью spout (из источника данных, например, общедоступного API) и перенаправляются в различные bolts , где данные фильтруются, очищаются, агрегируются, анализируются, отправляются в пользовательский интерфейс для просмотра людьми или любой другой цели. Сеть из носиков и болтов называется топологией , а потоки данных представлены в виде кортежей (список значений, которые могут иметь разные типы).

Источник: Источник:

Одна важная вещь, о которой следует поговорить, – это направление трафика данных. Обычно у нас был бы один или несколько видов спорта, считывающих данные из API, Kafka topic или какой-либо другой системы массового обслуживания. Затем данные будут передаваться в одну сторону на один или несколько болтов, которые могут перенаправлять их на другие болты и так далее. Bolts может публиковать проанализированные данные в пользовательском интерфейсе или в другом bolt. Но движение почти всегда однонаправленное , как у СОБАКИ. Хотя, безусловно, возможно создавать циклы, нам вряд ли понадобится такая запутанная топология.

Установка Storm release включает в себя ряд шагов, которые вы можете выполнить на своем компьютере. Но позже я буду использовать контейнеры Docker для развертывания кластера Storm, и образы позаботятся о настройке всего, что нам нужно.

Какой-то Код

В то время как Storm предлагает поддержку других языков , большинство топологий написаны на Java, поскольку это наиболее эффективный вариант, который у нас есть.

Очень простой носик, который просто выдает случайные цифры, может выглядеть следующим образом:

public class RandomDigitSpout extends BaseRichSpout 
{
  // To output tuples from spout to the next stage bolt
  SpoutOutputCollector collector;  

  public void nextTuple() 
  {
    int randomDigit = ThreadLocalRandom.current().nextInt(0, 10);

    // Emit the digit to the next stage bolt
    collector.emit(new Values(randomDigit));
  }

  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
  {
    // Tell Storm the schema of the output tuple for this spout.
    // It consists of a single column called 'random-digit'.
    outputFieldsDeclarer.declare(new Fields("random-digit"));
  }
}

И простой болт, который принимает поток случайных и просто испускает четные:

public class EvenDigitBolt extends BaseRichBolt 
{
  // To output tuples from this bolt to the next bolt.
  OutputCollector collector;

  public void execute(Tuple tuple) 
  {
    // Get the 1st column 'random-digit' from the tuple
    int randomDigit = tuple.getInt(0);

    if (randomDigit % 2 == 0) {
      collector.emit(new Values(randomDigit));
    }
  }

  public void declareOutputFields(OutputFieldsDeclarer declarer) 
  {
    // Tell Storm the schema of the output tuple for this bolt.
    // It consists of a single column called 'even-digit'
    declarer.declare(new Fields("even-digit"));
  }
}

Еще один простой болт, который получит отфильтрованный поток из Четного болта , и просто умножит каждую четную цифру на 10 и отправит ее вперед:

public class MultiplyByTenBolt extends BaseRichBolt 
{
  OutputCollector collector;

  public void execute(Tuple tuple) 
  {
    // Get 'even-digit' from the tuple.
    int evenDigit = tuple.getInt(0);

    collector.emit(new Values(evenDigit * 10));
  }

  public void declareOutputFields(OutputFieldsDeclarer declarer) 
  {
    declarer.declare(new Fields("even-digit-multiplied-by-ten"));
  }
}

Соединяем их вместе, чтобы сформировать нашу топологию:

package packagename
// ...

public class OurSimpleTopology { 

  public static void main(String[] args) throws Exception
  {
    // Create the topology
    TopologyBuilder builder = new TopologyBuilder();

    // Attach the random digit spout to the topology.
    // Use just 1 thread for the spout.
    builder.setSpout("random-digit-spout", new RandomDigitSpout());

    // Connect the even digit bolt to our spout. 
    // The bolt will use 2 threads and the digits will be randomly
    // shuffled/distributed among the 2 threads.
    // The third parameter is formally called the parallelism hint.
    builder.setBolt("even-digit-bolt", new EvenDigitBolt(), 2)
           .shuffleGrouping("random-digit-spout");

    // Connect the multiply-by-10 bolt to our even digit bolt.
    // This bolt will use 4 threads, among which data from the
    // even digit bolt will be shuffled/distributed randomly.
    builder.setBolt("multiplied-by-ten-bolt", new MultiplyByTenBolt(), 4)
           .shuffleGrouping("even-digit-bolt");

    // Create a configuration object.
    Config conf = new Config();

    // The number of independent JVM processes this topology will use.
    conf.setNumWorkers(2);

    // Submit our topology with the configuration.
    StormSubmitter.submitTopology("our-simple-topology", conf, builder.createTopology());
  }
}

Параллелизм В Штормовых Топологиях

Полное понимание параллелизма в Storm может быть непростой задачей, по крайней мере, по моему опыту. Для работы с топологией требуется по крайней мере один процесс (очевидно). В рамках этого процесса мы можем распараллелить выполнение наших носиков и болтов с помощью резьбы. В нашем примере randomdigitspout запустит только один поток, и данные, передаваемые из этого потока, будут распределены между 2 потоками EvenDigitBolt . Но способ, которым происходит это распределение, называемый группировкой потоков , может быть важным. Например, у вас может быть поток записей температуры из двух городов, где кортежи, испускаемые носиком, выглядят следующим образом:

// City name, temperature, time of recording

("Atlanta",       94, "2018-05-11 23:14")
("New York City", 75, "2018-05-11 23:15")
("New York City", 76, "2018-05-11 23:16")
("Atlanta",       96, "2018-05-11 23:15")
("New York City", 77, "2018-05-11 23:17")
("Atlanta",       95, "2018-05-11 23:16")
("New York City", 76, "2018-05-11 23:18")

Предположим, мы прикрепляем только один болт, задача которого состоит в том, чтобы рассчитать изменение средней температуры в каждом городе. Если мы можем разумно ожидать, что в любой заданный интервал времени мы получим примерно равное количество кортежей из обоих городов, было бы разумно выделить 2 потока для нашего bolt и отправить данные для Атланты в один из них, а Нью-Йорк – в другой. Нашей цели послужила бы группировка полей , которая распределяет данные между потоками по значению поля, указанного в группировке:

// The tuples with the same city name will go to the same thread.
builder.setBolt("avg-temp-bolt", new AvgTempBolt(), 2)
       .fieldsGrouping("temp-spout", new Fields("city_name"));

И, конечно же, существуют и другие типы группировок . Однако в большинстве случаев группировка, вероятно, не будет иметь большого значения, и вы можете просто перетасовать данные и случайным образом распределить их между потоками болтов ( группировка в случайном порядке ). Теперь есть еще один важный компонент: количество рабочих процессов, на которых будет выполняться наша топология. Общее количество потоков, которые мы указали, затем будет поровну разделено между рабочими процессами . Итак, в нашем примере топологии со случайной цифрой у нас была 1 резьба для носика, 2 четные резьбы для болтов и 4 резьбы для болтов, умноженные на десять (всего 7). Каждый из 2 рабочих процессов будет отвечать за выполнение 2 потоков болтов, умноженных на десять, 1 болта с четной цифрой, и один из процессов будет запускать 1 поток носика.

Конечно, у 2 рабочих процессов будут свои основные потоки, которые, в свою очередь, запустят потоки spout и bolt. Таким образом, в целом у нас будет 9 потоков. В совокупности они называются исполнителями .

Важно понимать, что если вы установите подсказку о параллелизме spout > 1 (т.Е. несколько исполнителей), вы можете в конечном итоге выдавать одни и те же данные несколько раз. Скажем, spout считывает данные из общедоступного API потока Twitter и использует двух исполнителей. Это означает, что пользователи, получающие данные из носика, получат один и тот же твит дважды. Только после того, как spout выдает кортежи, в игру вступает параллелизм данных, т.Е. кортежи распределяются между потоками в соответствии с указанной группировкой потоков.

Запускать несколько рабочих на одном узле было бы довольно бессмысленно. Позже, однако, мы будем использовать правильный распределенный многоузловой кластер и посмотрим, как рабочие распределены по разным узлам.

Построение Нашей Топологии

Вот структура каталогов, которую я предлагаю:

yourproject/
            pom.xml  
            src/
                jvm/
                    packagename/
                           RandomDigitSpout.java
                           EvenDigitBolt.java
                           MultiplyByTenBolt.java
                           OurSimpleTopology.java

Maven обычно используется для построения топологий Storm, и для этого требуется pom.xml файл (POM), который определяет различные детали конфигурации, зависимости проекта и т.д. . Вдаваться в подробности POM , вероятно, будет здесь излишним.

  • Сначала мы запустим mvn clean внутри вашего проекта , чтобы очистить все скомпилированные файлы, которые у нас могут быть, убедившись, что каждый модуль скомпилирован с нуля.
  • А затем mvn package скомпилировать наш код и упаковать его в исполняемый JAR-файл внутри недавно созданной папки target . В первый раз это может занять довольно много минут, особенно если ваша топология имеет много зависимостей.
  • Чтобы отправить свою топологию: storm jar target/packagename-{номер версии}.jar packagename. Наша Простая Топология

Будем надеяться, что к настоящему времени разрыв между концепцией и кодом в Storm был в некоторой степени преодолен. Однако никакое серьезное развертывание Storm не будет представлять собой единый экземпляр топологии, работающий на одном сервере.

Как Выглядит Грозовое Скопление

Чтобы в полной мере воспользоваться преимуществами масштабируемости Storm и отказоустойчивость , любая топология производственного уровня будет передана кластеру машин.

Дистрибутивы Storm устанавливаются на главном узле (Nimbus) и на всех подчиненных узлах (супервизорах). Узел master запускает демон Storm Nimbus и пользовательский интерфейс Storm. Узлы slave запускают демонов Storm Supervisor . Демон Zookeeper на отдельном узле используется для координации между главным узлом и подчиненными узлами. Zookeeper, кстати, используется только для управления кластером и никогда не для передачи каких-либо сообщений. Это не похоже на то, что носики и болты передают данные друг другу через него или что-то в этом роде. Демон Nimbus находит доступных Супервизоров через Zookeeper, в котором демоны Супервизора регистрируются самостоятельно. И другие управленческие задачи, некоторые из которых вскоре станут ясны.

Пользовательский интерфейс Storm – это веб-интерфейс, используемый для управления состоянием нашего кластера. Мы вернемся к этому позже.

Наша топология передается демону Nimbus на главном узле, а затем распределяется между рабочими процессами, запущенными на подчиненных/супервизорных узлах . Благодаря Zookeeper не имеет значения, сколько подчиненных/супервизорных узлов вы запускаете изначально, так как вы всегда можете легко добавить больше, и Storm автоматически интегрирует их в кластер.

Всякий раз, когда мы запускаем супервизор, он выделяет определенное количество рабочих процессов (которые мы можем настроить), которые затем могут быть использованы представленной топологией. Итак, на изображении выше всего 5 выделенных рабочих . Запомните эту строку: conf.setNumWorkers(5) Это означает, что топология попытается использовать в общей сложности 5 рабочих. И поскольку наши два узла супервизора имеют в общей сложности 5 выделенных рабочих : каждый из 5 выделенных рабочих процессов будет запускать один экземпляр топологии. Если бы мы выполнили: conf.setNumWorkers(4) , то один рабочий процесс остался бы незанятым/неиспользуемым. Если бы количество указанных рабочих было 6, а общее количество выделенных рабочих составляло 5, то из-за ограничения функционировали бы только 5 фактических рабочих топологии.

Прежде чем мы настроим все это с помощью Docker, следует иметь в виду несколько важных вещей, касающихся отказоустойчивости:

  • Если какой-либо рабочий на любом подчиненном узле умрет, демон супервизора перезапустит его. Если повторный перезапуск завершится неудачей, работник будет переназначен на другую машину.
  • Если весь подчиненный узел умирает, его доля работы будет передана другому узлу супервизора/подчиненного узла.
  • Если Нимбус выйдет из строя, рабочие останутся незатронутыми. Однако до тех пор, пока Nimbus не будет восстановлен, рабочие не будут переназначены на другие подчиненные узлы, если, скажем, их узел выйдет из строя.
  • Nimbus и супервизоры сами по себе не имеют состояния, но с помощью Zookeeper сохраняется некоторая информация о состоянии, чтобы все могло начаться с того места, где они были остановлены, если узел выйдет из строя или демон неожиданно умрет.
  • Демоны Nimbus, Supervisor и Zookeeper быстро отказывают. Это означает, что они сами не очень терпимы к неожиданным ошибкам и отключатся, если столкнутся с одной из них. По этой причине они должны запускаться под наблюдением с помощью сторожевой программы, которая постоянно отслеживает их и автоматически перезапускает, если они когда-либо выходят из строя . Supervisor , вероятно, является наиболее популярным вариантом для этого (не путать с демоном Storm Supervisor).

Примечание: В большинстве кластеров Storm сам Nimbus никогда не развертывается как отдельный экземпляр, а как кластер. Если эта отказоустойчивость не будет включена и наш единственный Нимб выйдет из строя, мы потеряем возможность отправлять новые топологии, изящно уничтожать запущенные топологии, переназначать работу другим узлам супервизора, если один из них выйдет из строя и т.д. . Для простоты в нашем иллюстративном кластере будет использоваться один экземпляр. Аналогично, Zookeeper очень часто развертывается как кластер, но мы будем использовать только один.

Привязка Кластера

Запуск отдельных контейнеров и всего, что с ними связано, может быть громоздким, поэтому я предпочитаю использовать Docker Compose . Изначально мы будем использовать один узел Zookeeper, один узел Nimbus и один узел Supervisor. Они будут определены как службы создания, все соответствующие одному контейнеру в начале. Позже я буду использовать Compose scaling , чтобы добавить еще один узел супервизора (контейнер). Вот наш весь код и структура проекта:

zookeeper/
         Dockerfile
storm-nimbus/
         Dockerfile
         storm.yaml
         code/
             pom.xml
             src/
                 jvm/
                     coincident_hashtags/
                                ExclamationTopology.java  
storm-supervisor/
         Dockerfile
         storm.yaml
docker-compose.yml

И наш docker-compose.yml :

version: '3.2'

services:
    zookeeper:
        build: ./zookeeper
        # Keep it running.  
        tty: true

    storm-nimbus:
        build: ./storm-nimbus
        # Run this service after 'zookeeper' and make 'zookeeper' reference.
        links:
            - zookeeper
        tty: true
        # Map port 8080 of the host machine to 8080 of the container.
        # To access the Storm UI from our host machine.
        ports:
            - 8080:8080
        volumes:
            - './storm-nimbus:/theproject'

    storm-supervisor:
        build: ./storm-supervisor
        links:
            - zookeeper
            - storm-nimbus
        tty: true

# Host volume used to store our code on the master node (Nimbus).
volumes:
    storm-nimbus:

Не стесняйтесь изучать файлы Dockerfiles. В основном они просто устанавливают зависимости (Java 8, Storm, Maven, Zookeeper и т.д.) В соответствующие контейнеры. Файлы storm.yaml переопределяют определенные конфигурации по умолчанию для установок Storm. Строка ADD storm.yaml/конфигурация внутри Nimbus и Supervisor Dockerfiles помещает их в контейнеры, где Storm может их прочитать. шторм-нимб/storm.yaml :

# The Nimbus needs to know where the Zookeeper is. This specifies the list of the
# hosts in the Zookeeper cluster. We're using just one node, of course.
# 'zookeeper' is the Docker Compose network reference.
storm.zookeeper.servers:
  - "zookeeper"

шторм-супервайзер/storm.yaml :

# Telling the Supervisor where the Zookeeper is.
storm.zookeeper.servers:
  - "zookeeper"

# The worker nodes need to know which machine(s) are the candidate of master
# in order to download the topology jars.
nimbus.seeds : ["storm-nimbus"]

# For each Supervisor, we configure how many workers run on that machine. 
# Each worker uses a single port for receiving messages, and this setting 
# defines which ports are open for use. We define four ports here, so Storm will 
# allocate up to four workers to run on this node.
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

Эти варианты подходят для нашего кластера. Более любопытные могут ознакомиться со всеми конфигурациями по умолчанию здесь .

Запустите docker-compose up в корне проекта.

После того, как все образы будут созданы и вся служба запущена, откройте новый терминал, введите docker ps и вы увидите что-то вроде этого:

Запускаю Нимбус

Давайте подключимся к контейнеру Nimbus по SSH, используя его имя: docker exec -it coincidenthashtagswithapachestorm_storm-nimbus_1 bash а затем запустите демон Nimbus: штормовой нимб

Запуск пользовательского интерфейса Шторма

Аналогично, откройте другой терминал, снова подключитесь к Nimbus по SSH и запустите пользовательский интерфейс с помощью storm пользовательский интерфейс :

Перейдите в localhost:8080 в вашем браузере, и вы увидите хороший обзор нашего кластера:

Свободные слоты в Сводке кластера указывают, сколько всего рабочих (на всех узлах супервизора) доступно и ожидает, пока топология их использует. Использованные Слоты укажите, сколько из общего числа в настоящее время занято топологией. Поскольку мы еще не запустили ни одного Супервайзера, они оба равны нулю. Мы доберемся до Исполнителей и Задачи позже. Кроме того, как мы видим, еще не было представлено никаких топологий.

Запуск Узла Супервизора

Войдите по SSH в контейнер one Supervisor и запустите демон супервизора: docker exec -это совпадающие хэштеги с apachestorm_storm-supervisor_1 bash руководитель шторма

Теперь давайте обновим ваш пользовательский интерфейс:

Примечание: Любые изменения в нашем кластере могут занять несколько секунд, чтобы отразиться на пользовательском интерфейсе.

У нас есть новый управляющий супервайзер, который поставляется с четырьмя выделенными работниками. Эти четыре рабочих являются результатом указания четырех портов в нашем storm.yaml для узла Супервизора. Конечно, все они бесплатны (четыре Бесплатных слота ). Давайте отправим топологию в Nimbus и приступим к работе.

Отправка Топологии В Nimbus

SSH в Nimbus на новом терминале. Я написал файл Dockerfile так что мы попадаем в наш рабочий (целевой) каталог /проект . Внутри этого находится код , где находится наша топология. Наша топология довольно проста . Он использует носик, который генерирует случайные слова, и болт, который просто добавляет три восклицательных знака (!!!) к словам. Два из этих болтов добавляются спина к спине, и поэтому в конце потока мы получим слова с шестью восклицательными знаками. В нем также указывается, что ему нужны три работника ( conf.setNumWorkers(3) ).

public static void main(String[] args) throws Exception
{
    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("word", new TestWordSpout(), 10);

    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");

    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");

    Config conf = new Config();

    // Turn on  debugging mode
    conf.setDebug(true);

    conf.setNumWorkers(3);

    StormSubmitter.submitTopology("exclamation-topology", conf, builder.createTopology());
}
  1. код компакт-диска
  2. mvn чистый
  3. пакет mvn
  4. штормовая банка target/coincident-hashtags-1.2.1.jar совпадающие хэштеги. Восклицательная топология

После успешной отправки топологии обновите пользовательский интерфейс:

Как только мы представили топологию, Смотритель зоопарка был уведомлен об этом. Смотритель зоопарка, в свою очередь, уведомил Супервайзера о необходимости загрузить код с Nimbus. Теперь мы видим нашу топологию вместе с тремя занятыми рабочими, оставляя только одного свободным. И 10 потоков с выводом слов + 3 потока с восклицанием 1 болт + 2 потока с восклицанием болт + 3 основных потока от рабочих = всего 18 исполнителей . И вы, возможно, заметили что-то новое: задачи .

WTF – Это Задачи

Еще одна концепция параллелизма Шторма. Но не волнуйтесь, задача – это всего лишь экземпляр носика или болта, который использует исполнитель; что на самом деле выполняет обработку. По умолчанию количество задач равно количеству исполнителей. В редких случаях вам может потребоваться, чтобы каждый исполнитель создавал дополнительные задачи.

// Each of the two executors (threads) of this bolt will instantiate
// two objects of this bolt (total 4 bolt objects/tasks).
builder.setBolt("even-digit-bolt", new EvenDigitBolt(), 2)
       .setNumTasks(4) 
       .shuffleGrouping("random-digit-spout");

Это недостаток с моей стороны, но я не могу придумать хорошего варианта использования, в котором нам понадобилось бы несколько задач для каждого исполнителя. Возможно, если бы мы сами добавляли некоторый параллелизм, например, создавали новый поток внутри bolt для обработки длительной задачи, тогда основной поток-исполнитель не будет блокироваться и сможет продолжить обработку с помощью другого bolt. Однако это может затруднить понимание нашей топологии. Если кто-нибудь знает о сценариях, в которых выигрыш в производительности от нескольких задач перевешивает дополнительную сложность, пожалуйста, оставьте комментарий.

В любом случае, возвращаясь с этого небольшого обхода, давайте посмотрим обзор нашей топологии. Нажмите на название в разделе Сводка топологии и прокрутите вниз до Рабочие ресурсы : Мы можем четко видеть разделение наших исполнителей (потоков) между 3 рабочими. И, конечно же, все 3 работника находятся на одном и том же узле супервизора, который мы запускаем.

Теперь, давайте скажем, масштабируйтесь!

Добавьте Еще Одного Руководителя

Из корня проекта давайте добавим еще один узел супервизора/контейнер docker-compose scale

SSH в новый контейнер: docker exec -это совпадающие хэштеги с apachestorm_storm-supervisor_2 bash

И запустите: штормовой супервайзер

Если вы обновите пользовательский интерфейс, вы увидите, что мы успешно добавили еще одного супервайзера и еще четырех работников (всего 8 работников/слотов). Чтобы по-настоящему воспользоваться преимуществами нового супервайзера, давайте увеличим количество работников топологии.

  • Сначала убейте бегущего: шторм убивает восклицание-топология
  • Изменение эта строка для: conf.setNumWorkers(6)
  • Измените номер версии проекта в вашем pom.xml . Попробуйте использовать правильную схему, например семантическое управление версиями. Я просто буду придерживаться версии 1.2.1.
  • Перестроить топологию: пакет mvn
  • Отправьте его повторно: storm jar target/coincident-hashtags-1.2.1.jar совпадающие хэштеги. Восклицательная топология

Перезагрузите пользовательский интерфейс: Теперь вы можете увидеть нового руководителя и 6 занятых работников из 8 доступных. Также важно отметить, что 6 занятых были поровну разделены между двумя руководителями. Снова щелкните имя топологии и прокрутите вниз. Мы видим два уникальных идентификатора супервизора, оба запущенных на разных узлах, и все наши исполнители довольно равномерно распределены между ними. Это здорово. Но Storm предлагает еще один отличный способ сделать это во время выполнения топологии . Что – то под названием перебалансировка . На Нимбусе мы бы запустили: восклицание восстановления баланса шторма-топология -n 6 (перейти от 3 до 6 рабочих) Или изменить количество исполнителей для конкретного компонента: шторм перебалансировать восклицание-топология -e

Надежная Обработка Сообщений

Один вопрос, который мы еще не рассмотрели, касается того, что произойдет, если bolt не сможет обработать кортеж. Что ж, Storm предоставляет нам механизм, с помощью которого исходный поток (в частности, task ) может воспроизвести неудачный кортеж. Эта гарантия обработки не возникает сама по себе, это сознательный выбор дизайна, который увеличивает задержку. Носики отправляют кортежи на болты, которые передают кортежи, полученные из входных кортежей, другим болтам и так далее. Этот один, оригинальный кортеж порождает целое дерево кортежей. Если какой-либо дочерний кортеж, так сказать, исходного кортежа выходит из строя, то любые меры по исправлению положения (откаты и т.д.) Вполне могут потребоваться в несколько этапов. Это может стать довольно запутанным, и поэтому то, что делает Storm, заключается в том, что он позволяет исходному кортежу снова излучаться прямо из источника (носика). Следовательно, любые операции, выполняемые bolts, которые являются функцией входящих кортежей, должны быть идемпотентными . Кортеж считается “полностью обработанным”, когда каждый кортеж в его дереве был обработан, и каждый кортеж должен быть явно подтвержден пользователем. Однако это еще не все. Есть еще одна вещь, которую необходимо сделать явно: поддерживать связь между исходным кортежем и его дочерними кортежами. Затем Storm сможет отследить происхождение дочерних кортежей и, таким образом, сможет воспроизвести исходный кортеж. Это называется привязкой . И это было сделано в нашем восклицательном знаке :

// ExclamationBolt

// 'tuple' is the original one received from the test word spout.
// It's been anchored to/with the tuple going out.
_collector.emit(tuple, new Values(exclamatedWord.toString()));

// Explicitly acknowledge that the tuple has been processed.
_collector.ack(tuple);

Вызов ack приведет к вызову метода ack в spout, если он был реализован. Итак, скажем, вы считываете данные кортежа из некоторой очереди и можете удалить их из очереди только в том случае, если кортеж был полностью обработан. Ну, метод ack – это то, где вы могли бы это сделать. Вы также можете выдавать кортежи без привязки: _collector.emit(новые значения(exclamatedWord.toString())) и отказаться от надежности.

Кортеж может выйти из строя двумя способами: i) Болт умирает, и время ожидания кортежа истекает. Или время ожидания истекает по какой-то другой причине. Тайм-аут по умолчанию составляет 30 секунд и может быть изменен с помощью config.put(Config. TOPOLOGY_MESSAGE_TIMEOUT_SECS, 60) ii) Метод fail явно вызывается для кортежа в bolt: _collector.fail(кортеж) . Вы можете сделать это в случае исключения.

В обоих этих случаях будет вызван метод fail в spout, если он реализован. И если мы хотим, чтобы кортеж был заменен, это должно быть сделано явно в методе fail путем вызова emit , точно так же , как в nextTuple() . При отслеживании кортежей каждый должен быть ack ed или сбой ed. В противном случае в топологии в конечном итоге закончится память. Также важно знать, что вы должны делать все это самостоятельно при написании пользовательских носиков и болтов. Но Штормовое ядро может помочь. Например, bolt, реализующий BaseBasicBolt , автоматически выполняет acking. Или встроенные носики для популярных источников данных, таких как Кафка позаботьтесь о логике очереди и воспроизведения после подтверждения и сбоя.

Прощальные выстрелы

Проектирование топологии или кластера Storm всегда сводится к настройке различных имеющихся у нас регуляторов и определению того, где результат кажется оптимальным. Есть несколько вещей, которые помогут в этом процессе, например, использование файла конфигурации для чтения подсказок о параллелизме, количестве рабочих мест и т.д., Чтобы вам не приходилось повторно редактировать и перекомпилировать свой код. Определите свои болты логически, по одному на каждую неделимую задачу, и сделайте их легкими и эффективными. Аналогичным образом, ваши методы spouts nextTuple() должны быть оптимизированы. Эффективно используйте пользовательский интерфейс Storm. По умолчанию он не показывает нам полную картину, только 5% от общего числа выдаваемых кортежей. Чтобы отслеживать все из них, используйте конфигурация.установите частоту дискретизации статистики (1.0d) . Следите за Значения Acks и Latency для отдельных болтов и топологий через пользовательский интерфейс – это то, на что вы хотите обратить внимание при повороте ручек.

Оригинал: “https://dev.to/usamaashraf/playing-with-apache-storm-on-docker—like-a-boss-4bgb”