1. Обзор
Этот учебник станет введением в Apache Storm , распределенную вычислительную систему реального времени.
Мы сосредоточимся на этом и рассмотрим:
- Что такое Apache Storm и какие проблемы он решает
- Его архитектура, и
- Как использовать его в проекте
2. Что Такое Apache Storm?
Apache Storm-это бесплатная распределенная система с открытым исходным кодом для вычислений в реальном времени.
Он обеспечивает отказоустойчивость, масштабируемость и гарантирует обработку данных, а также особенно хорош при обработке неограниченных потоков данных.
Некоторыми хорошими примерами использования Storm могут быть операции с кредитными картами для обнаружения мошенничества или обработка данных из умных домов для обнаружения неисправных датчиков.
Storm обеспечивает интеграцию с различными базами данных и системами массового обслуживания, доступными на рынке.
3. Зависимость от Maven
Прежде чем мы будем использовать Apache Storm, нам необходимо включить зависимость от ядра шторма в наш проект:
org.apache.storm storm-core 1.2.2 provided
Мы должны использовать только предусмотренный объем если мы намерены запустить наше приложение в кластере Storm.
Чтобы запустить приложение локально, мы можем использовать так называемый локальный режим, который будет имитировать кластер Storm в локальном процессе, в таком случае мы должны удалить предоставленный .
4. Модель Данных
Модель данных Apache Storm состоит из двух элементов: кортежей и потоков.
4.1. Кортеж
Кортеж – это упорядоченный список именованных полей с динамическими типами. Это означает, что нам не нужно явно объявлять типы полей.
Шторм должен знать, как сериализовать все значения, используемые в кортеже. По умолчанию он уже может сериализовать примитивные типы, Строки и байтовые массивы.
И поскольку Storm использует сериализацию Kryo, нам нужно зарегистрировать сериализатор с помощью Config , чтобы использовать пользовательские типы. Мы можем сделать это одним из двух способов:
Во-первых, мы можем зарегистрировать класс для сериализации, используя его полное имя:
Config config = new Config(); config.registerSerialization(User.class);
В таком случае Kryo сериализует класс с помощью FieldSerializer . По умолчанию это приведет к сериализации всех непереходных полей класса, как частных, так и общедоступных.
Или вместо этого мы можем предоставить как класс для сериализации, так и сериализатор, который мы хотим, чтобы Storm использовал для этого класса:
Config config = new Config(); config.registerSerialization(User.class, UserSerializer.class);
Чтобы создать пользовательский сериализатор, нам нужно расширить универсальный класс | Сериализатор , который имеет два метода запись и чтение.
4.2. Поток
A Поток является основной абстракцией в экосистеме Storm. | Поток представляет собой неограниченную последовательность кортежей.
Штормы позволяют обрабатывать несколько потоков параллельно.
У каждого потока есть идентификатор, который предоставляется и присваивается во время объявления.
5. Топология
Логика приложения Storm в реальном времени упакована в топологию. Топология состоит из носиков и болтов .
5.1. Носик
Ростки-это источники потоков. Они выделяют кортежи в топологию.
Кортежи можно считывать из различных внешних систем, таких как Kafka, Kestrel или ActiveMQ.
Ростки могут быть надежными или ненадежными . Надежный означает, что носик может ответить, что кортеж, который не удалось обработать штормом. Ненадежно означает, что носик не отвечает, так как он собирается использовать механизм “огонь и забудь” для выделения кортежей.
Чтобы создать пользовательский носик, нам нужно реализовать интерфейс IRichSpout или расширить любой класс, который уже реализует интерфейс, например, абстрактный класс BaseRichSpout .
Давайте создадим ненадежный носик:
public class RandomIntSpout extends BaseRichSpout {
private Random random;
private SpoutOutputCollector outputCollector;
@Override
public void open(Map map, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
random = new Random();
outputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
Utils.sleep(1000);
outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp"));
}
}Наш пользовательский RandomIntSpout будет генерировать случайное целое число и метку времени каждую секунду.
5.2. Болт
Болты обрабатывают кортежи в потоке. Они могут выполнять различные операции, такие как фильтрация, агрегирование или пользовательские функции.
Некоторые операции требуют нескольких шагов, и поэтому в таких случаях нам потребуется использовать несколько болтов.
Чтобы создать пользовательский Болт , нам нужно реализовать IRichBolt или для более простых операций Интерфейс IBasicBolt .
Существует также несколько вспомогательных классов, доступных для реализации Bolt. В этом случае мы будем использовать BaseBasicBolt :
public class PrintingBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
System.out.println(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}Этот пользовательский Печатающий болт просто выведет все кортежи на консоль.
6. Создание простой топологии
Давайте объединим эти идеи в простую топологию. Наша топология будет иметь один носик и три болта.
6.1. Случайное число
В начале мы создадим ненадежный носик. Он будет генерировать случайные целые числа из диапазона (0,100) каждую секунду:
public class RandomNumberSpout extends BaseRichSpout {
private Random random;
private SpoutOutputCollector collector;
@Override
public void open(Map map, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
random = new Random();
collector = spoutOutputCollector;
}
@Override
public void nextTuple() {
Utils.sleep(1000);
int operation = random.nextInt(101);
long timestamp = System.currentTimeMillis();
Values values = new Values(operation, timestamp);
collector.emit(values);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
}
}6.2. Фильтр-болт
Далее мы создадим болт, который будет отфильтровывать все элементы с операцией , равной 0:
public class FilteringBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
int operation = tuple.getIntegerByField("operation");
if (operation > 0) {
basicOutputCollector.emit(tuple.getValues());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
}
}6.3. Агрегатный болт
Далее, давайте создадим более сложный Болт , который будет объединять все положительные операции с каждого дня.
Для этой цели мы будем использовать специальный класс, созданный специально для реализации болтов, которые работают в Windows, вместо работы с одиночными кортежами: BaseWindowedBolt .
Окна являются важной концепцией в потоковой обработке, разделяющей бесконечные потоки на конечные куски. Затем мы можем применить вычисления к каждому фрагменту. Как правило, существует два типа окон:
Временные окна используются для группировки элементов за заданный период времени с использованием временных меток . Временные окна могут содержать разное количество элементов.
Окна подсчета используются для создания окон определенного размера . В таком случае все окна будут иметь одинаковый размер, и окно не будет отображаться, если элементов меньше определенного размера.
Наш AggregatingBolt сгенерирует сумму всех положительных операций из временного окна вместе с его отметками времени начала и окончания:
public class AggregatingBolt extends BaseWindowedBolt {
private OutputCollector outputCollector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.outputCollector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp"));
}
@Override
public void execute(TupleWindow tupleWindow) {
List tuples = tupleWindow.get();
tuples.sort(Comparator.comparing(this::getTimestamp));
int sumOfOperations = tuples.stream()
.mapToInt(tuple -> tuple.getIntegerByField("operation"))
.sum();
Long beginningTimestamp = getTimestamp(tuples.get(0));
Long endTimestamp = getTimestamp(tuples.get(tuples.size() - 1));
Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp);
outputCollector.emit(values);
}
private Long getTimestamp(Tuple tuple) {
return tuple.getLongByField("timestamp");
}
} Обратите внимание, что в этом случае прямое получение первого элемента списка безопасно. Это связано с тем, что каждое окно вычисляется с использованием метки времени поля кортежа, поэтому в каждом окне должен быть по крайней мере один элемент.
6.4. Болт для записи файлов
Наконец, мы создадим болт, который возьмет все элементы с суммой операций больше 2000, сериализует их и запишет в файл:
public class FileWritingBolt extends BaseRichBolt {
public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class);
private BufferedWriter writer;
private String filePath;
private ObjectMapper objectMapper;
@Override
public void cleanup() {
try {
writer.close();
} catch (IOException e) {
logger.error("Failed to close writer!");
}
}
@Override
public void prepare(Map map, TopologyContext topologyContext,
OutputCollector outputCollector) {
objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
try {
writer = new BufferedWriter(new FileWriter(filePath));
} catch (IOException e) {
logger.error("Failed to open a file for writing.", e);
}
}
@Override
public void execute(Tuple tuple) {
int sumOfOperations = tuple.getIntegerByField("sumOfOperations");
long beginningTimestamp = tuple.getLongByField("beginningTimestamp");
long endTimestamp = tuple.getLongByField("endTimestamp");
if (sumOfOperations > 2000) {
AggregatedWindow aggregatedWindow = new AggregatedWindow(
sumOfOperations, beginningTimestamp, endTimestamp);
try {
writer.write(objectMapper.writeValueAsString(aggregatedWindow));
writer.newLine();
writer.flush();
} catch (IOException e) {
logger.error("Failed to write data to file.", e);
}
}
}
// public constructor and other methods
}Обратите внимание, что нам не нужно объявлять выходные данные, так как это будет последний болт в нашей топологии
6.5. Запуск топологии
Наконец, мы можем собрать все воедино и запустить нашу топологию:
public static void runTopology() {
TopologyBuilder builder = new TopologyBuilder();
Spout random = new RandomNumberSpout();
builder.setSpout("randomNumberSpout");
Bolt filtering = new FilteringBolt();
builder.setBolt("filteringBolt", filtering)
.shuffleGrouping("randomNumberSpout");
Bolt aggregating = new AggregatingBolt()
.withTimestampField("timestamp")
.withLag(BaseWindowedBolt.Duration.seconds(1))
.withWindow(BaseWindowedBolt.Duration.seconds(5));
builder.setBolt("aggregatingBolt", aggregating)
.shuffleGrouping("filteringBolt");
String filePath = "./src/main/resources/data.txt";
Bolt file = new FileWritingBolt(filePath);
builder.setBolt("fileBolt", file)
.shuffleGrouping("aggregatingBolt");
Config config = new Config();
config.setDebug(false);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Test", config, builder.createTopology());
}Чтобы поток данных проходил через каждую часть топологии, нам нужно указать, как их соединить. shuffleGroup позволяет нам указать, что данные для filteringBolt будут поступать из randomNumberSpout .
Для каждого Болта нам нужно добавить Группу перемешивания , которая определяет источник элементов для этого болта. Источником элементов может быть Носик или другой Болт. И если мы установим один и тот же источник для более чем одного болта , источник будет излучать все элементы для каждого из них.
В этом случае наша топология будет использовать Локальный кластер для локального запуска задания.
7. Заключение
В этом уроке мы представили Apache Storm, распределенную вычислительную систему в реальном времени. Мы создали носик, несколько болтов и собрали их вместе в полную топологию.
И, как всегда, все примеры кода можно найти на GitHub .