Рубрики
Без рубрики

Вступление к Apache Storm

Узнайте, как использовать Apache Storm для обработки потоков данных

Автор оригинала: baeldung.

1. Обзор

Этот учебник станет введением в Apache Storm , распределенную вычислительную систему реального времени.

Мы сосредоточимся на этом и рассмотрим:

  • Что такое Apache Storm и какие проблемы он решает
  • Его архитектура, и
  • Как использовать его в проекте

2. Что Такое Apache Storm?

Apache Storm-это бесплатная распределенная система с открытым исходным кодом для вычислений в реальном времени.

Он обеспечивает отказоустойчивость, масштабируемость и гарантирует обработку данных, а также особенно хорош при обработке неограниченных потоков данных.

Некоторыми хорошими примерами использования Storm могут быть операции с кредитными картами для обнаружения мошенничества или обработка данных из умных домов для обнаружения неисправных датчиков.

Storm обеспечивает интеграцию с различными базами данных и системами массового обслуживания, доступными на рынке.

3. Зависимость от Maven

Прежде чем мы будем использовать Apache Storm, нам необходимо включить зависимость от ядра шторма в наш проект:


    org.apache.storm
    storm-core
    1.2.2
    provided

Мы должны использовать только предусмотренный объем если мы намерены запустить наше приложение в кластере Storm.

Чтобы запустить приложение локально, мы можем использовать так называемый локальный режим, который будет имитировать кластер Storm в локальном процессе, в таком случае мы должны удалить предоставленный .

4. Модель Данных

Модель данных Apache Storm состоит из двух элементов: кортежей и потоков.

4.1. Кортеж

Кортеж – это упорядоченный список именованных полей с динамическими типами. Это означает, что нам не нужно явно объявлять типы полей.

Шторм должен знать, как сериализовать все значения, используемые в кортеже. По умолчанию он уже может сериализовать примитивные типы, Строки и байтовые массивы.

И поскольку Storm использует сериализацию Kryo, нам нужно зарегистрировать сериализатор с помощью Config , чтобы использовать пользовательские типы. Мы можем сделать это одним из двух способов:

Во-первых, мы можем зарегистрировать класс для сериализации, используя его полное имя:

Config config = new Config();
config.registerSerialization(User.class);

В таком случае Kryo сериализует класс с помощью FieldSerializer . По умолчанию это приведет к сериализации всех непереходных полей класса, как частных, так и общедоступных.

Или вместо этого мы можем предоставить как класс для сериализации, так и сериализатор, который мы хотим, чтобы Storm использовал для этого класса:

Config config = new Config();
config.registerSerialization(User.class, UserSerializer.class);

Чтобы создать пользовательский сериализатор, нам нужно расширить универсальный класс | Сериализатор , который имеет два метода запись и чтение.

4.2. Поток

A Поток является основной абстракцией в экосистеме Storm. | Поток представляет собой неограниченную последовательность кортежей.

Штормы позволяют обрабатывать несколько потоков параллельно.

У каждого потока есть идентификатор, который предоставляется и присваивается во время объявления.

5. Топология

Логика приложения Storm в реальном времени упакована в топологию. Топология состоит из носиков и болтов .

5.1. Носик

Ростки-это источники потоков. Они выделяют кортежи в топологию.

Кортежи можно считывать из различных внешних систем, таких как Kafka, Kestrel или ActiveMQ.

Ростки могут быть надежными или ненадежными . Надежный означает, что носик может ответить, что кортеж, который не удалось обработать штормом. Ненадежно означает, что носик не отвечает, так как он собирается использовать механизм “огонь и забудь” для выделения кортежей.

Чтобы создать пользовательский носик, нам нужно реализовать интерфейс IRichSpout или расширить любой класс, который уже реализует интерфейс, например, абстрактный класс BaseRichSpout .

Давайте создадим ненадежный носик:

public class RandomIntSpout extends BaseRichSpout {

    private Random random;
    private SpoutOutputCollector outputCollector;

    @Override
    public void open(Map map, TopologyContext topologyContext,
      SpoutOutputCollector spoutOutputCollector) {
        random = new Random();
        outputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis()));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp"));
    }
}

Наш пользовательский RandomIntSpout будет генерировать случайное целое число и метку времени каждую секунду.

5.2. Болт

Болты обрабатывают кортежи в потоке. Они могут выполнять различные операции, такие как фильтрация, агрегирование или пользовательские функции.

Некоторые операции требуют нескольких шагов, и поэтому в таких случаях нам потребуется использовать несколько болтов.

Чтобы создать пользовательский Болт , нам нужно реализовать IRichBolt или для более простых операций Интерфейс IBasicBolt .

Существует также несколько вспомогательных классов, доступных для реализации Bolt. В этом случае мы будем использовать BaseBasicBolt :

public class PrintingBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        System.out.println(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

Этот пользовательский Печатающий болт просто выведет все кортежи на консоль.

6. Создание простой топологии

Давайте объединим эти идеи в простую топологию. Наша топология будет иметь один носик и три болта.

6.1. Случайное число

В начале мы создадим ненадежный носик. Он будет генерировать случайные целые числа из диапазона (0,100) каждую секунду:

public class RandomNumberSpout extends BaseRichSpout {
    private Random random;
    private SpoutOutputCollector collector;

    @Override
    public void open(Map map, TopologyContext topologyContext, 
      SpoutOutputCollector spoutOutputCollector) {
        random = new Random();
        collector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        int operation = random.nextInt(101);
        long timestamp = System.currentTimeMillis();

        Values values = new Values(operation, timestamp);
        collector.emit(values);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
    }
}

6.2. Фильтр-болт

Далее мы создадим болт, который будет отфильтровывать все элементы с операцией , равной 0:

public class FilteringBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        int operation = tuple.getIntegerByField("operation");
        if (operation > 0) {
            basicOutputCollector.emit(tuple.getValues());
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
    }
}

6.3. Агрегатный болт

Далее, давайте создадим более сложный Болт , который будет объединять все положительные операции с каждого дня.

Для этой цели мы будем использовать специальный класс, созданный специально для реализации болтов, которые работают в Windows, вместо работы с одиночными кортежами: BaseWindowedBolt .

Окна являются важной концепцией в потоковой обработке, разделяющей бесконечные потоки на конечные куски. Затем мы можем применить вычисления к каждому фрагменту. Как правило, существует два типа окон:

Временные окна используются для группировки элементов за заданный период времени с использованием временных меток . Временные окна могут содержать разное количество элементов.

Окна подсчета используются для создания окон определенного размера . В таком случае все окна будут иметь одинаковый размер, и окно не будет отображаться, если элементов меньше определенного размера.

Наш AggregatingBolt сгенерирует сумму всех положительных операций из временного окна вместе с его отметками времени начала и окончания:

public class AggregatingBolt extends BaseWindowedBolt {
    private OutputCollector outputCollector;
    
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.outputCollector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp"));
    }

    @Override
    public void execute(TupleWindow tupleWindow) {
        List tuples = tupleWindow.get();
        tuples.sort(Comparator.comparing(this::getTimestamp));

        int sumOfOperations = tuples.stream()
          .mapToInt(tuple -> tuple.getIntegerByField("operation"))
          .sum();
        Long beginningTimestamp = getTimestamp(tuples.get(0));
        Long endTimestamp = getTimestamp(tuples.get(tuples.size() - 1));

        Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp);
        outputCollector.emit(values);
    }

    private Long getTimestamp(Tuple tuple) {
        return tuple.getLongByField("timestamp");
    }
}

Обратите внимание, что в этом случае прямое получение первого элемента списка безопасно. Это связано с тем, что каждое окно вычисляется с использованием метки времени поля кортежа, поэтому в каждом окне должен быть по крайней мере один элемент.

6.4. Болт для записи файлов

Наконец, мы создадим болт, который возьмет все элементы с суммой операций больше 2000, сериализует их и запишет в файл:

public class FileWritingBolt extends BaseRichBolt {
    public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class);
    private BufferedWriter writer;
    private String filePath;
    private ObjectMapper objectMapper;

    @Override
    public void cleanup() {
        try {
            writer.close();
        } catch (IOException e) {
            logger.error("Failed to close writer!");
        }
    }

    @Override
    public void prepare(Map map, TopologyContext topologyContext, 
      OutputCollector outputCollector) {
        objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
        
        try {
            writer = new BufferedWriter(new FileWriter(filePath));
        } catch (IOException e) {
            logger.error("Failed to open a file for writing.", e);
        }
    }

    @Override
    public void execute(Tuple tuple) {
        int sumOfOperations = tuple.getIntegerByField("sumOfOperations");
        long beginningTimestamp = tuple.getLongByField("beginningTimestamp");
        long endTimestamp = tuple.getLongByField("endTimestamp");

        if (sumOfOperations > 2000) {
            AggregatedWindow aggregatedWindow = new AggregatedWindow(
                sumOfOperations, beginningTimestamp, endTimestamp);
            try {
                writer.write(objectMapper.writeValueAsString(aggregatedWindow));
                writer.newLine();
                writer.flush();
            } catch (IOException e) {
                logger.error("Failed to write data to file.", e);
            }
        }
    }
    
    // public constructor and other methods
}

Обратите внимание, что нам не нужно объявлять выходные данные, так как это будет последний болт в нашей топологии

6.5. Запуск топологии

Наконец, мы можем собрать все воедино и запустить нашу топологию:

public static void runTopology() {
    TopologyBuilder builder = new TopologyBuilder();

    Spout random = new RandomNumberSpout();
    builder.setSpout("randomNumberSpout");

    Bolt filtering = new FilteringBolt();
    builder.setBolt("filteringBolt", filtering)
      .shuffleGrouping("randomNumberSpout");

    Bolt aggregating = new AggregatingBolt()
      .withTimestampField("timestamp")
      .withLag(BaseWindowedBolt.Duration.seconds(1))
      .withWindow(BaseWindowedBolt.Duration.seconds(5));
    builder.setBolt("aggregatingBolt", aggregating)
      .shuffleGrouping("filteringBolt"); 
      
    String filePath = "./src/main/resources/data.txt";
    Bolt file = new FileWritingBolt(filePath);
    builder.setBolt("fileBolt", file)
      .shuffleGrouping("aggregatingBolt");

    Config config = new Config();
    config.setDebug(false);
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("Test", config, builder.createTopology());
}

Чтобы поток данных проходил через каждую часть топологии, нам нужно указать, как их соединить. shuffleGroup позволяет нам указать, что данные для filteringBolt будут поступать из randomNumberSpout .

Для каждого Болта нам нужно добавить Группу перемешивания , которая определяет источник элементов для этого болта. Источником элементов может быть Носик или другой Болт. И если мы установим один и тот же источник для более чем одного болта , источник будет излучать все элементы для каждого из них.

В этом случае наша топология будет использовать Локальный кластер для локального запуска задания.

7. Заключение

В этом уроке мы представили Apache Storm, распределенную вычислительную систему в реальном времени. Мы создали носик, несколько болтов и собрали их вместе в полную топологию.

И, как всегда, все примеры кода можно найти на GitHub .