1. Обзор
Этот учебник станет введением в Apache Storm , распределенную вычислительную систему реального времени.
Мы сосредоточимся на этом и рассмотрим:
- Что такое Apache Storm и какие проблемы он решает
- Его архитектура, и
- Как использовать его в проекте
2. Что Такое Apache Storm?
Apache Storm-это бесплатная распределенная система с открытым исходным кодом для вычислений в реальном времени.
Он обеспечивает отказоустойчивость, масштабируемость и гарантирует обработку данных, а также особенно хорош при обработке неограниченных потоков данных.
Некоторыми хорошими примерами использования Storm могут быть операции с кредитными картами для обнаружения мошенничества или обработка данных из умных домов для обнаружения неисправных датчиков.
Storm обеспечивает интеграцию с различными базами данных и системами массового обслуживания, доступными на рынке.
3. Зависимость от Maven
Прежде чем мы будем использовать Apache Storm, нам необходимо включить зависимость от ядра шторма в наш проект:
org.apache.storm storm-core 1.2.2 provided
Мы должны использовать только предусмотренный объем если мы намерены запустить наше приложение в кластере Storm.
Чтобы запустить приложение локально, мы можем использовать так называемый локальный режим, который будет имитировать кластер Storm в локальном процессе, в таком случае мы должны удалить предоставленный .
4. Модель Данных
Модель данных Apache Storm состоит из двух элементов: кортежей и потоков.
4.1. Кортеж
Кортеж – это упорядоченный список именованных полей с динамическими типами. Это означает, что нам не нужно явно объявлять типы полей.
Шторм должен знать, как сериализовать все значения, используемые в кортеже. По умолчанию он уже может сериализовать примитивные типы, Строки и байтовые массивы.
И поскольку Storm использует сериализацию Kryo, нам нужно зарегистрировать сериализатор с помощью Config , чтобы использовать пользовательские типы. Мы можем сделать это одним из двух способов:
Во-первых, мы можем зарегистрировать класс для сериализации, используя его полное имя:
Config config = new Config(); config.registerSerialization(User.class);
В таком случае Kryo сериализует класс с помощью FieldSerializer . По умолчанию это приведет к сериализации всех непереходных полей класса, как частных, так и общедоступных.
Или вместо этого мы можем предоставить как класс для сериализации, так и сериализатор, который мы хотим, чтобы Storm использовал для этого класса:
Config config = new Config(); config.registerSerialization(User.class, UserSerializer.class);
Чтобы создать пользовательский сериализатор, нам нужно расширить универсальный класс | Сериализатор , который имеет два метода запись и чтение.
4.2. Поток
A Поток является основной абстракцией в экосистеме Storm. | Поток представляет собой неограниченную последовательность кортежей.
Штормы позволяют обрабатывать несколько потоков параллельно.
У каждого потока есть идентификатор, который предоставляется и присваивается во время объявления.
5. Топология
Логика приложения Storm в реальном времени упакована в топологию. Топология состоит из носиков и болтов .
5.1. Носик
Ростки-это источники потоков. Они выделяют кортежи в топологию.
Кортежи можно считывать из различных внешних систем, таких как Kafka, Kestrel или ActiveMQ.
Ростки могут быть надежными или ненадежными . Надежный означает, что носик может ответить, что кортеж, который не удалось обработать штормом. Ненадежно означает, что носик не отвечает, так как он собирается использовать механизм “огонь и забудь” для выделения кортежей.
Чтобы создать пользовательский носик, нам нужно реализовать интерфейс IRichSpout или расширить любой класс, который уже реализует интерфейс, например, абстрактный класс BaseRichSpout .
Давайте создадим ненадежный носик:
public class RandomIntSpout extends BaseRichSpout { private Random random; private SpoutOutputCollector outputCollector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { random = new Random(); outputCollector = spoutOutputCollector; } @Override public void nextTuple() { Utils.sleep(1000); outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis())); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp")); } }
Наш пользовательский RandomIntSpout будет генерировать случайное целое число и метку времени каждую секунду.
5.2. Болт
Болты обрабатывают кортежи в потоке. Они могут выполнять различные операции, такие как фильтрация, агрегирование или пользовательские функции.
Некоторые операции требуют нескольких шагов, и поэтому в таких случаях нам потребуется использовать несколько болтов.
Чтобы создать пользовательский Болт , нам нужно реализовать IRichBolt или для более простых операций Интерфейс IBasicBolt .
Существует также несколько вспомогательных классов, доступных для реализации Bolt. В этом случае мы будем использовать BaseBasicBolt :
public class PrintingBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { System.out.println(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
Этот пользовательский Печатающий болт просто выведет все кортежи на консоль.
6. Создание простой топологии
Давайте объединим эти идеи в простую топологию. Наша топология будет иметь один носик и три болта.
6.1. Случайное число
В начале мы создадим ненадежный носик. Он будет генерировать случайные целые числа из диапазона (0,100) каждую секунду:
public class RandomNumberSpout extends BaseRichSpout { private Random random; private SpoutOutputCollector collector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { random = new Random(); collector = spoutOutputCollector; } @Override public void nextTuple() { Utils.sleep(1000); int operation = random.nextInt(101); long timestamp = System.currentTimeMillis(); Values values = new Values(operation, timestamp); collector.emit(values); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("operation", "timestamp")); } }
6.2. Фильтр-болт
Далее мы создадим болт, который будет отфильтровывать все элементы с операцией , равной 0:
public class FilteringBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { int operation = tuple.getIntegerByField("operation"); if (operation > 0) { basicOutputCollector.emit(tuple.getValues()); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("operation", "timestamp")); } }
6.3. Агрегатный болт
Далее, давайте создадим более сложный Болт , который будет объединять все положительные операции с каждого дня.
Для этой цели мы будем использовать специальный класс, созданный специально для реализации болтов, которые работают в Windows, вместо работы с одиночными кортежами: BaseWindowedBolt .
Окна являются важной концепцией в потоковой обработке, разделяющей бесконечные потоки на конечные куски. Затем мы можем применить вычисления к каждому фрагменту. Как правило, существует два типа окон:
Временные окна используются для группировки элементов за заданный период времени с использованием временных меток . Временные окна могут содержать разное количество элементов.
Окна подсчета используются для создания окон определенного размера . В таком случае все окна будут иметь одинаковый размер, и окно не будет отображаться, если элементов меньше определенного размера.
Наш AggregatingBolt сгенерирует сумму всех положительных операций из временного окна вместе с его отметками времени начала и окончания:
public class AggregatingBolt extends BaseWindowedBolt { private OutputCollector outputCollector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.outputCollector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp")); } @Override public void execute(TupleWindow tupleWindow) { Listtuples = tupleWindow.get(); tuples.sort(Comparator.comparing(this::getTimestamp)); int sumOfOperations = tuples.stream() .mapToInt(tuple -> tuple.getIntegerByField("operation")) .sum(); Long beginningTimestamp = getTimestamp(tuples.get(0)); Long endTimestamp = getTimestamp(tuples.get(tuples.size() - 1)); Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp); outputCollector.emit(values); } private Long getTimestamp(Tuple tuple) { return tuple.getLongByField("timestamp"); } }
Обратите внимание, что в этом случае прямое получение первого элемента списка безопасно. Это связано с тем, что каждое окно вычисляется с использованием метки времени поля кортежа, поэтому в каждом окне должен быть по крайней мере один элемент.
6.4. Болт для записи файлов
Наконец, мы создадим болт, который возьмет все элементы с суммой операций больше 2000, сериализует их и запишет в файл:
public class FileWritingBolt extends BaseRichBolt { public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class); private BufferedWriter writer; private String filePath; private ObjectMapper objectMapper; @Override public void cleanup() { try { writer.close(); } catch (IOException e) { logger.error("Failed to close writer!"); } } @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); try { writer = new BufferedWriter(new FileWriter(filePath)); } catch (IOException e) { logger.error("Failed to open a file for writing.", e); } } @Override public void execute(Tuple tuple) { int sumOfOperations = tuple.getIntegerByField("sumOfOperations"); long beginningTimestamp = tuple.getLongByField("beginningTimestamp"); long endTimestamp = tuple.getLongByField("endTimestamp"); if (sumOfOperations > 2000) { AggregatedWindow aggregatedWindow = new AggregatedWindow( sumOfOperations, beginningTimestamp, endTimestamp); try { writer.write(objectMapper.writeValueAsString(aggregatedWindow)); writer.newLine(); writer.flush(); } catch (IOException e) { logger.error("Failed to write data to file.", e); } } } // public constructor and other methods }
Обратите внимание, что нам не нужно объявлять выходные данные, так как это будет последний болт в нашей топологии
6.5. Запуск топологии
Наконец, мы можем собрать все воедино и запустить нашу топологию:
public static void runTopology() { TopologyBuilder builder = new TopologyBuilder(); Spout random = new RandomNumberSpout(); builder.setSpout("randomNumberSpout"); Bolt filtering = new FilteringBolt(); builder.setBolt("filteringBolt", filtering) .shuffleGrouping("randomNumberSpout"); Bolt aggregating = new AggregatingBolt() .withTimestampField("timestamp") .withLag(BaseWindowedBolt.Duration.seconds(1)) .withWindow(BaseWindowedBolt.Duration.seconds(5)); builder.setBolt("aggregatingBolt", aggregating) .shuffleGrouping("filteringBolt"); String filePath = "./src/main/resources/data.txt"; Bolt file = new FileWritingBolt(filePath); builder.setBolt("fileBolt", file) .shuffleGrouping("aggregatingBolt"); Config config = new Config(); config.setDebug(false); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Test", config, builder.createTopology()); }
Чтобы поток данных проходил через каждую часть топологии, нам нужно указать, как их соединить. shuffleGroup позволяет нам указать, что данные для filteringBolt будут поступать из randomNumberSpout .
Для каждого Болта нам нужно добавить Группу перемешивания , которая определяет источник элементов для этого болта. Источником элементов может быть Носик или другой Болт. И если мы установим один и тот же источник для более чем одного болта , источник будет излучать все элементы для каждого из них.
В этом случае наша топология будет использовать Локальный кластер для локального запуска задания.
7. Заключение
В этом уроке мы представили Apache Storm, распределенную вычислительную систему в реальном времени. Мы создали носик, несколько болтов и собрали их вместе в полную топологию.
И, как всегда, все примеры кода можно найти на GitHub .