Рубрики
Без рубрики

Пример Apache Spark: Программа подсчета слов на Java

Пример Apache Spark, Программа подсчета слов Apache Spark на Java, Пример Java Apache Spark, Учебник по Apache Spark, пример кода интеграции java apache spark.

Автор оригинала: Pankaj Kumar.

Apache Spark

Apache Spark-это платформа обработки данных с открытым исходным кодом, которая может выполнять аналитические операции с большими данными в распределенной среде. Это был академический проект в Калифорнийском университете в Беркли, и первоначально он был начат Матеем Захарией в AMPLab Калифорнийского университета в Беркли в 2009 году. Apache Spark был создан поверх инструмента управления кластерами, известного как Mesos. Позже он был изменен и обновлен, чтобы он мог работать в кластерной среде с распределенной обработкой.

Пример настройки проекта Apache Spark

Мы будем использовать Maven для создания образца проекта для демонстрации. Чтобы создать проект, выполните следующую команду в каталоге, который вы будете использовать в качестве рабочей области:

mvn archetype:generate -DgroupId=com.journaldev.sparkdemo -DartifactId=JD-Spark-WordCount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

Если вы запускаете maven в первый раз, выполнение команды generate займет несколько секунд, потому что maven должен загрузить все необходимые плагины и артефакты, чтобы выполнить задачу генерации.

После того, как вы создали проект, не стесняйтесь открывать его в своей любимой среде разработки. Следующим шагом является добавление соответствующих зависимостей Maven в проект. Вот это pom.xml файл с соответствующими зависимостями:



    
    
        org.apache.spark
        spark-core_2.11
        1.4.0
    

    
        junit
        junit
        4.11
        test
    




    
        
            org.apache.maven.plugins
            maven-compiler-plugin
            2.0.2
            
                1.8
                1.8
            
        
        
            org.apache.maven.plugins
            maven-jar-plugin
            
                
                    
                        true
                        lib/
                        com.geekcap.javaworld.sparkexample.WordCount
                    
                
            
        
        
            org.apache.maven.plugins
            maven-dependency-plugin
            
                
                    copy
                    install
                    
                        copy-dependencies
                    
                    
                        ${project.build.directory}/lib
                    
                
            
        
    

Поскольку это проект на основе maven, на самом деле нет необходимости устанавливать и настраивать Apache Spark на вашем компьютере. Когда мы запустим этот проект, будет запущен экземпляр среды выполнения Apache Spark, и как только программа завершит выполнение, она будет завершена.

Наконец, чтобы понять все банки, которые добавляются в проект при добавлении этой зависимости, мы можем запустить простую команду Maven, которая позволяет нам видеть полное дерево зависимостей для проекта, когда мы добавляем в него некоторые зависимости. Вот команда, которую мы можем использовать:

mvn dependency:tree

Когда мы запустим эту команду, она покажет нам следующее дерево зависимостей:

shubham:JD-Spark-WordCount shubham$ mvn dependency:tree

[INFO] Scanning for projects...
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.journaldev:java-word-count:jar:1.0-SNAPSHOT
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-jar-plugin is missing. @ line 41, column 21
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING]
[INFO]
[INFO] -------------------< com.journaldev:java-word-count >-------------------
[INFO] Building java-word-count 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ java-word-count ---
[INFO] com.journaldev:java-word-count:jar:1.0-SNAPSHOT
[INFO] +- org.apache.spark:spark-core_2.11:jar:1.4.0:compile
[INFO] |  +- com.twitter:chill_2.11:jar:0.5.0:compile
[INFO] |  |  \- com.esotericsoftware.kryo:kryo:jar:2.21:compile
[INFO] |  |     +- com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:compile
[INFO] |  |     +- com.esotericsoftware.minlog:minlog:jar:1.2:compile
[INFO] |  |     \- org.objenesis:objenesis:jar:1.2:compile
[INFO] |  +- com.twitter:chill-java:jar:0.5.0:compile
[INFO] |  +- org.apache.hadoop:hadoop-client:jar:2.2.0:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-common:jar:2.2.0:compile
[INFO] |  |  |  +- commons-cli:commons-cli:jar:1.2:compile
[INFO] |  |  |  +- org.apache.commons:commons-math:jar:2.1:compile
[INFO] |  |  |  +- xmlenc:xmlenc:jar:0.52:compile
[INFO] |  |  |  +- commons-io:commons-io:jar:2.1:compile
[INFO] |  |  |  +- commons-logging:commons-logging:jar:1.1.1:compile
[INFO] |  |  |  +- commons-lang:commons-lang:jar:2.5:compile
[INFO] |  |  |  +- commons-configuration:commons-configuration:jar:1.6:compile
[INFO] |  |  |  |  +- commons-collections:commons-collections:jar:3.2.1:compile
[INFO] |  |  |  |  +- commons-digester:commons-digester:jar:1.8:compile
[INFO] |  |  |  |  |  \- commons-beanutils:commons-beanutils:jar:1.7.0:compile
[INFO] |  |  |  |  \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile
[INFO] |  |  |  +- org.codehaus.jackson:jackson-core-asl:jar:1.8.8:compile
[INFO] |  |  |  +- org.apache.avro:avro:jar:1.7.4:compile
[INFO] |  |  |  +- com.google.protobuf:protobuf-java:jar:2.5.0:compile
[INFO] |  |  |  +- org.apache.hadoop:hadoop-auth:jar:2.2.0:compile
[INFO] |  |  |  \- org.apache.commons:commons-compress:jar:1.4.1:compile
[INFO] |  |  |     \- org.tukaani:xz:jar:1.0:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-hdfs:jar:2.2.0:compile
[INFO] |  |  |  \- org.mortbay.jetty:jetty-util:jar:6.1.26:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-mapreduce-client-app:jar:2.2.0:compile
[INFO] |  |  |  +- org.apache.hadoop:hadoop-mapreduce-client-common:jar:2.2.0:compile
[INFO] |  |  |  |  +- org.apache.hadoop:hadoop-yarn-client:jar:2.2.0:compile
[INFO] |  |  |  |  |  +- com.google.inject:guice:jar:3.0:compile
[INFO] |  |  |  |  |  |  +- javax.inject:javax.inject:jar:1:compile
[INFO] |  |  |  |  |  |  \- aopalliance:aopalliance:jar:1.0:compile
[INFO] |  |  |  |  |  +- com.sun.jersey.jersey-test-framework:jersey-test-framework-grizzly2:jar:1.9:compile
[INFO] |  |  |  |  |  |  +- com.sun.jersey.jersey-test-framework:jersey-test-framework-core:jar:1.9:compile
[INFO] |  |  |  |  |  |  |  +- javax.servlet:javax.servlet-api:jar:3.0.1:compile
[INFO] |  |  |  |  |  |  |  \- com.sun.jersey:jersey-client:jar:1.9:compile
[INFO] |  |  |  |  |  |  \- com.sun.jersey:jersey-grizzly2:jar:1.9:compile
[INFO] |  |  |  |  |  |     +- org.glassfish.grizzly:grizzly-https:jar:2.1.2:compile
[INFO] |  |  |  |  |  |     |  \- org.glassfish.grizzly:grizzly-framework:jar:2.1.2:compile
[INFO] |  |  |  |  |  |     |     \- org.glassfish.gmbal:gmbal-api-only:jar:3.0.0-b023:compile
[INFO] |  |  |  |  |  |     |        \- org.glassfish.external:management-api:jar:3.0.0-b012:compile
[INFO] |  |  |  |  |  |     +- org.glassfish.grizzly:grizzly-http-server:jar:2.1.2:compile
[INFO] |  |  |  |  |  |     |  \- org.glassfish.grizzly:grizzly-rcm:jar:2.1.2:compile
[INFO] |  |  |  |  |  |     +- org.glassfish.grizzly:grizzly-http-servlet:jar:2.1.2:compile
[INFO] |  |  |  |  |  |     \- org.glassfish:javax.servlet:jar:3.1:compile
[INFO] |  |  |  |  |  +- com.sun.jersey:jersey-json:jar:1.9:compile
[INFO] |  |  |  |  |  |  +- org.codehaus.jettison:jettison:jar:1.1:compile
[INFO] |  |  |  |  |  |  |  \- stax:stax-api:jar:1.0.1:compile
[INFO] |  |  |  |  |  |  +- com.sun.xml.bind:jaxb-impl:jar:2.2.3-1:compile
[INFO] |  |  |  |  |  |  |  \- javax.xml.bind:jaxb-api:jar:2.2.2:compile
[INFO] |  |  |  |  |  |  |     \- javax.activation:activation:jar:1.1:compile
[INFO] |  |  |  |  |  |  +- org.codehaus.jackson:jackson-jaxrs:jar:1.8.3:compile
[INFO] |  |  |  |  |  |  \- org.codehaus.jackson:jackson-xc:jar:1.8.3:compile
[INFO] |  |  |  |  |  \- com.sun.jersey.contribs:jersey-guice:jar:1.9:compile
[INFO] |  |  |  |  \- org.apache.hadoop:hadoop-yarn-server-common:jar:2.2.0:compile
[INFO] |  |  |  \- org.apache.hadoop:hadoop-mapreduce-client-shuffle:jar:2.2.0:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-yarn-api:jar:2.2.0:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.2.0:compile
[INFO] |  |  |  \- org.apache.hadoop:hadoop-yarn-common:jar:2.2.0:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-mapreduce-client-jobclient:jar:2.2.0:compile
[INFO] |  |  \- org.apache.hadoop:hadoop-annotations:jar:2.2.0:compile
[INFO] |  +- org.apache.spark:spark-launcher_2.11:jar:1.4.0:compile
[INFO] |  +- org.apache.spark:spark-network-common_2.11:jar:1.4.0:compile
[INFO] |  +- org.apache.spark:spark-network-shuffle_2.11:jar:1.4.0:compile
[INFO] |  +- org.apache.spark:spark-unsafe_2.11:jar:1.4.0:compile
[INFO] |  +- net.java.dev.jets3t:jets3t:jar:0.7.1:compile
[INFO] |  |  +- commons-codec:commons-codec:jar:1.3:compile
[INFO] |  |  \- commons-httpclient:commons-httpclient:jar:3.1:compile
[INFO] |  +- org.apache.curator:curator-recipes:jar:2.4.0:compile
[INFO] |  |  +- org.apache.curator:curator-framework:jar:2.4.0:compile
[INFO] |  |  |  \- org.apache.curator:curator-client:jar:2.4.0:compile
[INFO] |  |  +- org.apache.zookeeper:zookeeper:jar:3.4.5:compile
[INFO] |  |  |  \- jline:jline:jar:0.9.94:compile
[INFO] |  |  \- com.google.guava:guava:jar:14.0.1:compile
[INFO] |  +- org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016:compile
[INFO] |  +- org.apache.commons:commons-lang3:jar:3.3.2:compile
[INFO] |  +- org.apache.commons:commons-math3:jar:3.4.1:compile
[INFO] |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
[INFO] |  +- org.slf4j:slf4j-api:jar:1.7.10:compile
[INFO] |  +- org.slf4j:jul-to-slf4j:jar:1.7.10:compile
[INFO] |  +- org.slf4j:jcl-over-slf4j:jar:1.7.10:compile
[INFO] |  +- log4j:log4j:jar:1.2.17:compile
[INFO] |  +- org.slf4j:slf4j-log4j12:jar:1.7.10:compile
[INFO] |  +- com.ning:compress-lzf:jar:1.0.3:compile
[INFO] |  +- org.xerial.snappy:snappy-java:jar:1.1.1.7:compile
[INFO] |  +- net.jpountz.lz4:lz4:jar:1.2.0:compile
[INFO] |  +- org.roaringbitmap:RoaringBitmap:jar:0.4.5:compile
[INFO] |  +- commons-net:commons-net:jar:2.2:compile
[INFO] |  +- org.spark-project.akka:akka-remote_2.11:jar:2.3.4-spark:compile
[INFO] |  |  +- org.spark-project.akka:akka-actor_2.11:jar:2.3.4-spark:compile
[INFO] |  |  |  \- com.typesafe:config:jar:1.2.1:compile
[INFO] |  |  +- io.netty:netty:jar:3.8.0.Final:compile
[INFO] |  |  +- org.spark-project.protobuf:protobuf-java:jar:2.5.0-spark:compile
[INFO] |  |  \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile
[INFO] |  +- org.spark-project.akka:akka-slf4j_2.11:jar:2.3.4-spark:compile
[INFO] |  +- org.scala-lang:scala-library:jar:2.11.6:compile
[INFO] |  +- org.json4s:json4s-jackson_2.11:jar:3.2.10:compile
[INFO] |  |  \- org.json4s:json4s-core_2.11:jar:3.2.10:compile
[INFO] |  |     +- org.json4s:json4s-ast_2.11:jar:3.2.10:compile
[INFO] |  |     \- org.scala-lang:scalap:jar:2.11.0:compile
[INFO] |  |        \- org.scala-lang:scala-compiler:jar:2.11.0:compile
[INFO] |  |           +- org.scala-lang.modules:scala-xml_2.11:jar:1.0.1:compile
[INFO] |  |           \- org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.1:compile
[INFO] |  +- com.sun.jersey:jersey-server:jar:1.9:compile
[INFO] |  |  \- asm:asm:jar:3.1:compile
[INFO] |  +- com.sun.jersey:jersey-core:jar:1.9:compile
[INFO] |  +- org.apache.mesos:mesos:jar:shaded-protobuf:0.21.1:compile
[INFO] |  +- io.netty:netty-all:jar:4.0.23.Final:compile
[INFO] |  +- com.clearspring.analytics:stream:jar:2.7.0:compile
[INFO] |  +- io.dropwizard.metrics:metrics-core:jar:3.1.0:compile
[INFO] |  +- io.dropwizard.metrics:metrics-jvm:jar:3.1.0:compile
[INFO] |  +- io.dropwizard.metrics:metrics-json:jar:3.1.0:compile
[INFO] |  +- io.dropwizard.metrics:metrics-graphite:jar:3.1.0:compile
[INFO] |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.4.4:compile
[INFO] |  |  +- com.fasterxml.jackson.core:jackson-annotations:jar:2.4.0:compile
[INFO] |  |  \- com.fasterxml.jackson.core:jackson-core:jar:2.4.4:compile
[INFO] |  +- com.fasterxml.jackson.module:jackson-module-scala_2.11:jar:2.4.4:compile
[INFO] |  |  +- org.scala-lang:scala-reflect:jar:2.11.2:compile
[INFO] |  |  \- com.thoughtworks.paranamer:paranamer:jar:2.6:compile
[INFO] |  +- org.apache.ivy:ivy:jar:2.4.0:compile
[INFO] |  +- oro:oro:jar:2.0.8:compile
[INFO] |  +- org.tachyonproject:tachyon-client:jar:0.6.4:compile
[INFO] |  |  \- org.tachyonproject:tachyon:jar:0.6.4:compile
[INFO] |  +- net.razorvine:pyrolite:jar:4.4:compile
[INFO] |  +- net.sf.py4j:py4j:jar:0.8.2.1:compile
[INFO] |  \- org.spark-project.spark:unused:jar:1.0.0:compile
[INFO] \- junit:junit:jar:4.11:test
[INFO]    \- org.hamcrest:hamcrest-core:jar:1.3:test
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.987 s
[INFO] Finished at: 2018-04-07T15:50:34+05:30
[INFO] ------------------------------------------------------------------------

Всего с двумя добавленными зависимостями Spark собрал все необходимые зависимости в проекте, который включает в себя зависимости Scala, а также Apache Spark, написанный на самом Scala.

Создание входного файла

Поскольку мы собираемся создать программу-счетчик слов, мы создадим образец входного файла для вашего проекта в корневом каталоге нашего проекта с именем input.txt. Поместите в него любой контент, мы используем следующий текст:

Hello, my name is Shubham and I am author at JournalDev . JournalDev is a great website to ready
great lessons about Java, Big Data, Python and many more Programming languages.

Big Data lessons are difficult to find but at JournalDev , you can find some excellent
pieces of lessons written on Big Data.

Не стесняйтесь использовать любой текст в этом файле.

Структура проекта

Прежде чем мы продолжим и начнем работать над кодом для проекта, давайте представим здесь структуру проекта, которая у нас будет, как только мы закончим добавлять весь код в проект:

Структура проекта

Создание счетчика слов

Теперь мы готовы приступить к написанию нашей программы. Когда вы начинаете работать с программами больших данных, импорт может создать много путаницы. Чтобы избежать этого, вот все импортные материалы, которые мы будем использовать в нашем проекте:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

Далее, вот структура нашего класса, которую мы будем использовать:

package com.journaldev.sparkdemo;

...imports...

public class WordCounter {

    private static void wordCount(String fileName) {
        ...
    }

    public static void main(String[] args) {
        ...
    }
}

Вся логика будет лежать внутри метода подсчет слов . Мы начнем с определения объекта для класса SparkConf . Объект этого класса используется для задания различных параметров Spark в качестве пар ключ-значение для программы. Мы предоставляем только простые параметры:

SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");

master указывает local, что означает, что эта программа должна подключаться к потоку Spark, запущенному на локальном хосте . Имя приложения – это просто способ предоставить Spark метаданные приложения. Теперь мы можем создать объект SparkContext с помощью этого объекта конфигурации:

JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

Spark рассматривает каждый ресурс, который он обрабатывает, как RDD (Устойчивые распределенные наборы данных), что помогает ему организовывать данные в структуру данных поиска, которая гораздо эффективнее поддается анализу. Теперь мы преобразуем входной файл в сам объект JavaRDD :

JavaRDD inputFile = sparkContext.textFile(fileName);

Теперь мы будем использовать API Java 8 для обработки файла JavaRDD и разделения слов, содержащихся в файле, на отдельные слова:

JavaRDD wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));

Опять же, мы используем Java 8 mapToPair(...) метод для подсчета слов и предоставления слова, числа пары, которые могут быть представлены в качестве выходных данных:

JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);

Теперь мы можем сохранить выходной файл в виде текстового файла:

countData.saveAsTextFile("CountData");

Наконец, мы можем предоставить точку входа в нашу программу с помощью метода main() :

public static void main(String[] args) {
    if (args.length == 0) {
        System.out.println("No files provided.");
        System.exit(0);
    }
    wordCount(args[0]);
}

Полный файл выглядит следующим образом:

package com.journaldev.sparkdemo;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class WordCounter {

    private static void wordCount(String fileName) {

        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");

        JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        JavaRDD inputFile = sparkContext.textFile(fileName);

        JavaRDD wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));

        JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);

        countData.saveAsTextFile("CountData");
    }

    public static void main(String[] args) {

        if (args.length == 0) {
            System.out.println("No files provided.");
            System.exit(0);
        }

        wordCount(args[0]);
    }
}

Теперь мы перейдем к запуску этой программы с использованием самого Maven.

Запуск приложения

Чтобы запустить приложение, зайдите в корневой каталог программы и выполните следующую команду:

mvn exec:java -Dexec.mainClass=com.journaldev.sparkdemo.WordCounter -Dexec.args="input.txt"

В этой команде мы предоставляем Maven полное имя основного класса, а также имя входного файла. Как только эта команда будет выполнена, мы увидим, что в нашем проекте создается новый каталог:

Выходной каталог проекта

Когда мы откроем каталог и файл с именем “part-00000.txt” внутри него его содержимое выглядит следующим образом:

Вывод счетчика слов

Вывод

На этом уроке мы увидели, как мы можем использовать Apache Spark в проекте на основе Maven для создания простой, но эффективной программы для счетчиков слов. Прочитайте больше сообщений о больших данных, чтобы получить более глубокие знания о доступных инструментах и системах обработки больших данных.

Загрузите исходный код