Рубрики
Без рубрики

Написание Spark: Scala Vs Java

Рассмотрим некоторые различия между написанием заданий Spark на Scala и Java. Помеченный spark, scala, java.

Я присоединился к команде в начале апреля 2019 года. Они писали Spark jobs для выполнения ряда различных задач в Scala. В то время я знал только Java, очень мало Scala и почти ничего не знал о Spark. Перенесемся в сегодняшний день и теперь у меня есть несколько месяцев опыта работы в Scala и Spark.

Совсем недавно меня включили в проект, который сканирует таблицу HBase, выполняет несколько действий с данными и записывает их в другую таблицу HBase. В Scala все просто, верно? Несколько недель спустя я выполнил свою первую работу. Развеваются красные флаги. Он был написан на Scala, а не на Java! Scala еще не был принят в качестве языка, в который компания хочет “инвестировать”. Почти никто не знает языка и не может поддерживать работу, как только она будет передана. Мне нужно переписать задание на Java и вот это приводит меня к этому самому сообщению в блоге.

Смысл этого поста в блоге состоит в том, чтобы описать мои испытания и невзгоды при написании задания Spark на Java. Хотя, да, вы абсолютно можете написать задание Spark на Java, вы также должны посмотреть, насколько МЕНЬШЕ вам придется писать, если вместо этого вы можете использовать Scala. Этот пост также может быть использован в качестве аргумента в ваших аргументах в пользу того, почему ваша компания должна инвестировать в Scala, если вы используете Spark в любом качестве. Кроме того, полное раскрытие информации, могут быть абсолютно ЛУЧШИЕ способы написания следующего кода Java при сравнении его со Scala. Так что, если вы видите, что что-то сделано плохо, не стесняйтесь оставлять комментарии и обучать меня лучшему способу.

А теперь давайте перейдем к сути сообщения…

искра

Сразу же, если ваша компания использует Spark, они должны иметь возможность поддерживать использование Scala. Искра была написана в Scala . Если вы с нетерпением ждете новой функции в Spark, есть шанс, что вы получите эту функцию сначала в Scala, а затем и на других языках.

Искра Оболочка – еще одна важная причина. Оболочка Spark – это, по сути, Scala REPL, который позволяет вам взаимодействовать с Spark API. Знаете ли вы, что вам нужно просканировать таблицу HBase, но вы не можете придумать правильный синтаксис для этого? Откройте оболочку Spark, введите какой-нибудь Scala, и вы довольно быстро узнаете, находитесь ли вы на правильном пути или нет.

С Java нет простого способа. По моему опыту, это было поискать ответ в Интернете, написать его, упаковать код в JAR, перенести его туда, где я могу загрузить его в наш кластер, и запустить (надеясь на лучшее).

Кодирование

Ладно, хватит общих подробностей. Давайте углубимся в некоторый код и выясним, где, я думаю, вы столкнетесь с наибольшей болью при написании заданий Spark на Java.

Фреймы данных и наборы данных

Фреймы данных – это данные, организованные в именованные столбцы. Они представляют собой неизменяемый распределенный набор данных.

Наборы данных являются расширением API DataFrame. Он обеспечивает типобезопасный интерфейс ООП.

Подробнее об этих двух можно найти здесь .

Итак, допустим, вы только что отсканировали таблицу HBase, и у вас есть объект строк таблицы с именем rows . Вы пытаетесь получить конкретные данные из этой таблицы (на основе семейства и квалификатора) и поместить их в набор данных. Как бы вы это сделали?

Скала

val rowsDS = 
  rows.map(kv => (Bytes.toString(kv._1.get()),
    Bytes.toString(kv._2.getValue(Bytes.toBytes("family"), Bytes.toBytes("qualifier"))))
  ).toDS()

Как по волшебству, .точки () метод – это все, что вам нужно.

А как насчет Java?

Java, предполагая, что у вас есть JavaPairRDD с именем rowRDD уже настроен с вашими результатами сканирования. | | переменная spark здесь находится переменная

Dataset rowsDS = spark.createDataFrame(rowRDD, MyRowSchema.class);

Круто! Тоже одна строчка! Подождите, что это за второй параметр? Зачем вам нужен класс? Что ж, это необходимо для применения определенной схемы к вашему RDD, чтобы преобразовать ее в набор данных. Помните, выше я говорил, что наборы данных типобезопасны? Итак, необходим дополнительный шаг, такой как отображение ваших результатов rowRDD в объект типа MyRowSchema .

JavaRDD javaRDD = rowRDD.map((Function, String>) tuple -> {
Result result = tuple._2;

return new MyRowSchema(Bytes.toString(result.getValue(Bytes.toBytes("family"), Bytes.toBytes("qualifier"))));

});

Dataset rowsDS = spark.createDataFrame(javaRDD, MyRowSchema.class);

Из-за шаблона Java требуется написать гораздо больше кода и пройти еще несколько обручей через createDataFrame в Java, чем с помощью простого .dots () метод, который вы получаете в Scala.

Преобразования

Преобразования – это отличный способ связать пользовательские преобразования в цепочку, например, добавить столбец данных в набор данных. Смотрите это потрясающее сообщение в блоге для получения более подробной информации о преобразованиях.

Допустим, вы хотите получить всех людей во фрейме данных с именем “Райан” и их соответствующим возрастом… Scala позволяет вам сначала определить свои методы, а затем связать их вместе.

def retrieveAllRyansAge()(df: DataFrame): DataFrame = {
  df.filter(col("name").startsWith("Ryan"))
    .select("name", "age")
}
val ryansAgeDF = peoplesAgeDF
  .transform(RyanTransformer.retrieveAllRyansAge)

И вы закончили!

Если вы следили за этим до сих пор, вы можете догадаться, что эквивалент Java будет немного длиннее…

public static Function1, Dataset> retrieveAllRyansAge = new AbstractFunction1, Dataset>() {
    @Override
    public Dataset apply(Dataset peopleDS) {
        return retrieveRyansWithAge(peopleDS);
    }
};

private static Dataset retrieveRyansWithAge(Dataset df) {
    return df.filter(col("name").startsWith("Ryan"))
            .select("name", "age");
}

Конечно, я добавил дополнительный метод для разделения фактической работы, чтобы вы теоретически могли использовать вышеупомянутый метод.

И теперь фактическое преобразование

Dataset ryansAgeDF = peoplesAgeDF
        .transform(RyanTransformer.retrieveAllRyansAge);
Определяемые Пользователем Функции

Еще одним удобным элементом для написания заданий Spark являются пользовательские функции (UDFs). Это функции, написанные пользователем, как вы могли бы догадаться по названию, в тех случаях, когда встроенные функции не способны выполнять то, что необходимо.

Например, если вам нужно преобразовать 3-буквенный код языка в 2-буквенный код языка, например, “eng” в “en”. Вы могли бы написать что-то вроде приведенного ниже в Scala.

def getFormattedLanguage(lang: String): String = {

  Option(lang).map({
    l =>
      l.replaceAll(" ", "").toLowerCase match {
        case "eng" => "en"
        case "fre" => "fr"
        case _ => "en"
      }
  }).getOrElse("en")
}

val getFormattedLanguageUDF = udf[String, String] (getFormattedLanguage)

val formattedLanguages = languagesDF.withColumn("formattedLanguage", LanguageFormatter.getFormattedLanguageUDF(col("language")))

В Java существует специальный способ вызова UDFs. Что-то вроде:

spark.udf().register(UDF_FORMATTED_LANG, (UDF1) LanguageFormatter::getFormattedLanguage, DataTypes.StringType);

Dataset formattedLanguages = languagesDF.withColumn("formattedLanguage", callUDF(UDF_FORMATTED_LANG, col("language")));

Где UDF_FORMATTED_LANG – это строка с именем UDF.

Модульное тестирование

Последняя тема, которую я хотел затронуть, которая также близка и дорога моему сердцу, – это модульное тестирование кода, который вы пишете для своей работы Spark, будь то на Java или Scala.

Взяв приведенный выше пример, который я написал для преобразований, если бы я хотел извлечь всех райанов и их возраст из фрейма данных и подсчитать, сколько у меня было в тесте, это выглядело бы так, как показано ниже:

trait SparkSessionTestWrapper {
  lazy val spark: SparkSession = {
    SparkSession
      .builder()
      .master("local")
      .appName("spark test example")
      .getOrCreate()
  }
}

Приведенная выше черта необходима для нижеприведенного класса:

class NameAndAgeExtractorTest extends FunSuite with BeforeAndAfter with SparkSessionTestWrapper {

  test("Ryan retrieval returns empty data frame when no Ryans 
  found") {
   val sourceDF = Seq(
      ("Bryan", "21", "M")
    ).toDF("name", "age", "gender")

   val actualDF = 
     sourceDF.transform(RyanTransformer.retrieveAllRyansAge())

   assert(actualDF.count() == 0)
  }

}

Обратите внимание, как я смог создать фрейм данных в нескольких строках кода? А затем смог вызвать метод преобразования с помощью метода, который я хотел протестировать, и получить свой ответ в двух других строках кода?

Теперь перейдем к Яве…

class NameAndAgeExtractorTest {

@BeforeEach
void setup() {
    spark = SparkSession.builder()
             .appName("Tests")
             .master("local")
             .getOrCreate();

    javaSparkContext = new 
             JavaSparkContext(spark.sparkContext());

    namesSchema = DataTypes.createStructType(new StructField[]{
            DataTypes.createStructField("name", 
              DataTypes.StringType, false),
            DataTypes.createStructField("age", 
              DataTypes.StringType, false),
            DataTypes.createStructField("gender", 
              DataTypes.StringType, false)
    });
}
}

Здесь у нас есть только метод setup . Мы должны определить схему для вашего фрейма данных, на которую будет дана ссылка в приведенном ниже тесте. В отличие от Scala, где мы можем динамически создавать фрейм данных без схемы, Java требует, чтобы мы придерживались типов, и поэтому создает больше кода, который мы должны написать, чтобы протестировать один маленький метод, который взаимодействует с фреймом данных.

@Test
void retrieveAllRyans_returns_empty_dataset_when_no_Ryans_found() {
    List people = new ArrayList<>();
    people.add(new String[]{"Bryan", "21", "M"});

    JavaRDD rowRDD = javaSparkContext.parallelize(people).map(RowFactory::create);

    Dataset df = spark.createDataFrame(rowRDD, namesSchema);

    Dataset resultDf = df.transform(RyanTransformer.retrieveAllRyansAge);

    Assertions.assertEquals(0L, resultDf.count());
}

Есть еще несколько шагов, которые мы должны пройти, прежде чем протестировать наш метод. Во-первых, создайте список и добавьте в него элементы. Во-вторых, создайте JavaRDD чтобы соответствовать тому, что нужно Java. Наконец, мы можем создать фрейм данных и теперь использовать наш метод и подтвердить наш результат.

Теперь это относится только к одному элементу и к одному фрейму данных. Представьте себе код, который нам понадобится, если нам понадобится более одного элемента, и что произойдет, если нам понадобятся разные типы фреймов данных. Вы смотрите на две строки кода для Scala по сравнению с десятками строк в Java.

Оглядываясь назад на темы, которые я затронул:

  1. Фреймы данных и наборы данных
  2. Преобразования
  3. Определяемые Пользователем Функции
  4. Модульное тестирование

Эти темы – это то, что я считаю очень важным при написании заданий Spark. Если вы уберете что-нибудь из этого поста в блоге, я надеюсь, вы остановитесь и подумаете о направлении вашей будущей работы в Spark. Опять же, вы абсолютно можете написать их на Java (и написать их хорошо). Но дополнительное время, которое вам потребуется на написание шаблона, и некоторая сложность обручей, которые вам придется преодолеть, когда Scala предложит вам это в меньшем количестве строк, в конечном итоге будут стоить вам дорого.

Спасибо за чтение!

Оригинал: “https://dev.to/stylestrip/writing-spark-scala-vs-java-49p4”