Рубрики
Без рубрики

Apache Spark: Различия между фреймами данных, наборами данных и R DDs

Изучите три основные концепции Spark: фреймы данных, наборы данных и RDDS.

Автор оригинала: Aaron Juarez.

1. Обзор

Apache Spark – это быстрая распределенная система обработки данных. Он выполняет обработку данных в памяти и использует кэширование в памяти и оптимизированное выполнение, что обеспечивает высокую производительность. Он предоставляет высокоуровневые API для популярных языков программирования, таких как Scala, Python, Java и R.

В этом кратком руководстве мы рассмотрим три основные концепции Spark: фреймы данных, наборы данных и RDDS.

2. Фрейм данных

Spark SQL представил абстракцию табличных данных, называемую фреймом данных, начиная с Spark 1.3. С тех пор она стала одной из самых важных функций в Spark. Этот API полезен, когда мы хотим обрабатывать структурированные и полуструктурированные распределенные данные.

В разделе 3 мы обсудим Устойчивые распределенные наборы данных (RDD). Фреймы данных хранят данные более эффективно, чем RDDS, это связано с тем, что они используют неизменяемые, встроенные, устойчивые, распределенные и параллельные возможности RDDS, но они также применяют схему к данным. Фреймы данных также преобразуют SQL-код в оптимизированные низкоуровневые операции RDD.

Мы можем создавать фреймы данных тремя способами:

  • Преобразование существующих RDDs
  • Выполнение SQL-запросов
  • Загрузка внешних данных

Команда Spark представила Spark Session в версии 2.0 она объединяет все различные контексты, гарантируя, что разработчикам не нужно будет беспокоиться о создании различных контекстов:

SparkSession session = SparkSession.builder()
  .appName("TouristDataFrameExample")
  .master("local[*]")
  .getOrCreate();

DataFrameReader dataFrameReader = session.read();

Мы будем анализировать файл Tourist.csv :

Dataset data = dataFrameReader.option("header", "true")
  .csv("data/Tourist.csv");

Поскольку фрейм данных Spark 2.0 стал Dataset типа Row , мы можем использовать фрейм данных в качестве псевдонима для Dataset .

Мы можем выбрать конкретные столбцы, которые нас интересуют. Мы также можем фильтровать и группировать по заданному столбцу:

data.select(col("country"), col("year"), col("value"))
  .show();

data.filter(col("country").equalTo("Mexico"))
  .show();

data.groupBy(col("country"))
  .count()
  .show();

3. Наборы данных

Набор данных-это набор строго типизированных структурированных данных . Они обеспечивают знакомый объектно-ориентированный стиль программирования, а также преимущества безопасности типов, поскольку наборы данных могут проверять синтаксис и обнаруживать ошибки во время компиляции.

Dataset является расширением фрейма данных, поэтому мы можем рассматривать фрейм данных как нетипизированное представление набора данных.

Команда Spark выпустила API Dataset в Spark 1.6, и, как они уже упоминали: “Цель наборов данных Spark-предоставить API, который позволяет пользователям легко выражать преобразования в доменах объектов, а также обеспечивает преимущества производительности и надежности механизма выполнения SQL Spark”.

Во-первых, нам нужно будет создать класс типа Туристические данные :

public class TouristData {
    private String region;
    private String country;
    private String year;
    private String series;
    private Double value;
    private String footnotes;
    private String source;
    // ... getters and setters
}

Чтобы сопоставить каждую из наших записей с указанным типом, нам нужно будет использовать кодер. Кодеры переводят между объектами Java и внутренним двоичным форматом Spark :

// SparkSession initialization and data load
Dataset responseWithSelectedColumns = data.select(col("region"), 
  col("country"), col("year"), col("series"), col("value").cast("double"), 
  col("footnotes"), col("source"));

Dataset typedDataset = responseWithSelectedColumns
  .as(Encoders.bean(TouristData.class));

Как и в случае с фреймом данных, мы можем фильтровать и группировать по определенным столбцам:

typedDataset.filter((FilterFunction) record -> record.getCountry()
  .equals("Norway"))
  .show();

typedDataset.groupBy(typedDataset.col("country"))
  .count()
  .show();

Мы также можем выполнять такие операции, как фильтрация по столбцу, соответствующему определенному диапазону, или вычисление суммы определенного столбца, чтобы получить его общее значение:

typedDataset.filter((FilterFunction) record -> record.getYear() != null 
  && (Long.valueOf(record.getYear()) > 2010 
  && Long.valueOf(record.getYear()) < 2017)).show();

typedDataset.filter((FilterFunction) record -> record.getValue() != null 
  && record.getSeries()
    .contains("expenditure"))
    .groupBy("country")
    .agg(sum("value"))
    .show();

4. RDDs

Устойчивый распределенный набор данных или RDD является основной абстракцией программирования Spark. Он представляет собой набор элементов, который является: неизменяемым, устойчивым и распределенным .

RDD инкапсулирует большой набор данных, Spark автоматически распределит данные, содержащиеся в RDDS, по нашему кластеру и распараллелит операции, которые мы выполняем с ними .

Мы можем создавать RDDS только с помощью операций с данными в стабильном хранилище или операций с другими RDDS.

Отказоустойчивость имеет важное значение, когда мы имеем дело с большими наборами данных и данные распределяются на кластерных машинах. RDDs устойчивы благодаря встроенной механике восстановления неисправностей Spark. Spark опирается на тот факт, что RDDS запоминают, как они были созданы, чтобы мы могли легко проследить происхождение, чтобы восстановить раздел .

Существует два типа операций, которые мы можем выполнять с RDDs: Преобразования и действия .

4.1. Преобразования

Мы можем применить преобразования к RDD, чтобы манипулировать его данными. После выполнения этой манипуляции мы получим совершенно новый RDD, так как RDD являются неизменяемыми объектами .

Мы проверим, как реализовать Карту и фильтр, два наиболее распространенных преобразования.

Во-первых, нам нужно создать JavaSparkContext и загрузить данные в виде RDD из файла Tourist.csv :

SparkConf conf = new SparkConf().setAppName("uppercaseCountries")
  .setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD tourists = sc.textFile("data/Tourist.csv");

Затем давайте применим функцию карты, чтобы получить название страны из каждой записи и преобразовать название в верхний регистр. Мы можем сохранить этот вновь созданный набор данных в виде текстового файла на диске:

JavaRDD upperCaseCountries = tourists.map(line -> {
    String[] columns = line.split(COMMA_DELIMITER);
    return columns[1].toUpperCase();
}).distinct();

upperCaseCountries.saveAsTextFile("data/output/uppercase.txt");

Если мы хотим выбрать только определенную страну, мы можем применить функцию фильтра к нашим исходным туристам RDD:

JavaRDD touristsInMexico = tourists
  .filter(line -> line.split(COMMA_DELIMITER)[1].equals("Mexico"));

touristsInMexico.saveAsTextFile("data/output/touristInMexico.txt");

4.2. Действия

Действия вернут конечное значение или сохранят результаты на диск после выполнения некоторых вычислений с данными.

Два из используемых в настоящее время действий в Spark-это Подсчет и Сокращение.

Давайте подсчитаем общее количество стран в нашем CSV-файле:

// Spark Context initialization and data load
JavaRDD countries = tourists.map(line -> {
    String[] columns = line.split(COMMA_DELIMITER);
    return columns[1];
}).distinct();

Long numberOfCountries = countries.count();

Теперь мы рассчитаем общие расходы по странам. Нам нужно будет отфильтровать записи, содержащие расходы в их описании.

Вместо использования JavaRDD мы будем использовать JavaPairRDD . Пара RDD-это тип RDD, который может хранить пары ключ-значение . Давайте проверим это в следующий раз:

JavaRDD touristsExpenditure = tourists
  .filter(line -> line.split(COMMA_DELIMITER)[3].contains("expenditure"));

JavaPairRDD expenditurePairRdd = touristsExpenditure
  .mapToPair(line -> {
      String[] columns = line.split(COMMA_DELIMITER);
      return new Tuple2<>(columns[1], Double.valueOf(columns[6]));
});

List> totalByCountry = expenditurePairRdd
  .reduceByKey((x, y) -> x + y)
  .collect();

5. Заключение

Подводя итог, мы должны использовать фреймы данных или наборы данных, когда нам нужны доменные API, нам нужны выражения высокого уровня, такие как агрегация, сумма или SQL-запросы. Или когда мы хотим обеспечить безопасность типов во время компиляции.

С другой стороны, мы должны использовать RDDS, когда данные неструктурированы и нам не нужно реализовывать определенную схему или когда нам нужны низкоуровневые преобразования и действия.

Как всегда, все примеры кода доступны на GitHub .