Java - How to use OVHcloud Object Storage with Apache Spark on the Data Processing platform

OpenStack Swift and its S3 compatible API is a common way to store the data you want to use for your Apache Spark jobs. Let's find out how to do it in Java!

Last updated 20th January, 2021

Objective

This guide gives you a basic example about using OpenStack Swift and its S3 API with OVHcloud Data Processing using Java.

We will use the OpenStack S3 API to read and write data to OVHcloud Object Storage.

Samples are based on the well-known WordCount. We will first read data from a text file, then count the occurrences of each word in this particular file. And then print the result in output log and also write the result in a text file in OVHcloud Swift Object Storage.

Requirements

Read/Write data with Apache Spark using OpenStack Swift S3 API in Java

Find below the code in Java that:

  • reads 'novel.txt' object in OVHcloud Object Storage through its S3 API
  • counts the occurrences of each word in the file
  • stores the results in the OVHcloud Object Storage through its S3 API
  • prints the results in the output log of the job

This code in Java reads one object novel.txt that is uploaded into a container named textfile and prints the number of occurrences per word in output logs of the job. As it is mentioned in requirements, we created a container named textfile and uploaded the novel.txt object into that container.

You need to create a jar file from your Java code and upload it in your OVHcloud Object Storage as well. This jar file and novel.txt can be uploaded in separated containers or even in different cloud projects or OVHcloud accounts. Also this program will write the result in another text file named result.txt in the same container that novel.txt have been uploaded.

JavaWordCount.java :

import org.apache.hadoop.conf.Configuration;
import scala.Tuple2;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

public final class JavaWordCount {
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {

        String myAccessKey = "7decf61921524a6b828c9305a77bb201";
        String mySecretKey = "9e9c50f2ff514fc3bdc5f98e61bec81f";
        String bucket = "textfile";
        String filepath = "novel.txt";
        String filepath_result = "result.txt"; 
        SparkSession spark = SparkSession
                .builder()
                .appName("JavaWordCount")
                .getOrCreate();

        Configuration hadoopConf = spark.sparkContext().hadoopConfiguration();
        hadoopConf.set("fs.s3a.access.key", myAccessKey);
        hadoopConf.set("fs.s3a.secret.key", mySecretKey);
        hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
        hadoopConf.set("fs.s3a.endpoint","s3.gra.cloud.ovh.net"); 
        hadoopConf.set("fs.s3a.path.style.access", "true");

        JavaRDD<String> lines = spark.read().textFile("s3a://" + bucket + "/" + filepath).javaRDD();
        JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
        JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));
        JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);

        counts.saveAsTextFile("s3a://" + bucket + "/" + filepath_result);
        List<Tuple2<String, Integer>> output = counts.collect();
        for (Tuple2<?,?> tuple : output) {
            System.out.println(tuple._1() + ": " + tuple._2());
        }
        spark.stop();
    }
}

One way to package this java code and create a jar file, is to create a pom.xml file and build with Maven software with command mvn package. You can use this pom.xml file for example:

pom.xml :

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ovh.dataprocessing</groupId>
    <artifactId>sparkwordcount</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>3.2.2</spark.version>
        <hadoop.aws.version>2.8.5</hadoop.aws.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <scope>provided</scope> 
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <scope>provided</scope> 
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <scope>provided</scope> 
            <version>${hadoop.aws.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <shadedClassifierName>fat</shadedClassifierName>
                            <shadedArtifactAttached>true</shadedArtifactAttached>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>JavaWordCount</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

Everything in OVHcloud object storage container in which you uploaded your code, will be downloaded to the Data Processing cluster. If you have big volume of data, the best practice is to put your data in a separated Object Storage container.

You can find the source code of this project in OVHcloud github repository in this address: ovh/data-processing-samples [1]

Notes

[1] Please select the tag related to the Spark version you want to use.

Go further

These samples are quite basic. They provide the first step to interact with object storage from within your code and, then, go further.

Concerning the 'WordCount' use case, here is a link to a more advanced tutorial about Wordcount.

If you are not familiar with Apache Spark, we recommend you to visit Apache Spark's official website

You can send your questions, suggestions or feedbacks in our community of users on our Discord in the channel #dataprocessing-spark


Cette documentation vous a-t-elle été utile ?

N’hésitez pas à nous proposer des suggestions d’amélioration afin de faire évoluer cette documentation.

Images, contenu, structure… N’hésitez pas à nous dire pourquoi afin de la faire évoluer ensemble !

Vos demandes d’assistance ne seront pas traitées par ce formulaire. Pour cela, utilisez le formulaire "Créer un ticket" .

Merci beaucoup pour votre aide ! Vos retours seront étudiés au plus vite par nos équipes..


Ces guides pourraient également vous intéresser...

OVHcloud Community

Accedez à votre espace communautaire. Posez des questions, recherchez des informations, publiez du contenu et interagissez avec d’autres membres d'OVHcloud Community.

Echanger sur OVHcloud Community

Conformément à la Directive 2006/112/CE modifiée, à partir du 01/01/2015, les prix TTC sont susceptibles de varier selon le pays de résidence du client
(par défaut les prix TTC affichés incluent la TVA française en vigueur).