Java - How to use OVHcloud Object Storage with Apache Spark on the Data Processing platform

OpenStack Swift and its S3 compatible API is a common way to store the data you want to use for your Apache Spark jobs. Let's find out how to do it in Java!

Last updated 20th January, 2021

Objective

This guide gives you a basic example about using OpenStack Swift and its S3 API with OVHcloud Data Processing using Java.

We will use the OpenStack S3 API to read and write data to OVHcloud Object Storage.

Samples are based on the well-known WordCount. We will first read data from a text file, then count the occurrences of each word in this particular file. And then print the result in output log and also write the result in a text file in OVHcloud Swift Object Storage.

Requirements

Read/Write data with Apache Spark using OpenStack Swift S3 API in Java

Find below the code in Java that:

  • reads 'novel.txt' object in OVHcloud Object Storage through its S3 API
  • counts the occurrences of each word in the file
  • stores the results in the OVHcloud Object Storage through its S3 API
  • prints the results in the output log of the job

This code in Java reads one object novel.txt that is uploaded into a container named textfile and prints the number of occurrences per word in output logs of the job. As it is mentioned in requirements, we created a container named textfile and uploaded the novel.txt object into that container.

You need to create a jar file from your Java code and upload it in your OVHcloud Object Storage as well. This jar file and novel.txt can be uploaded in separated containers or even in different cloud projects or OVHcloud accounts. Also this program will write the result in another text file named result.txt in the same container that novel.txt have been uploaded.

JavaWordCount.java :

import org.apache.hadoop.conf.Configuration;
import scala.Tuple2;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

public final class JavaWordCount {
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {

        String myAccessKey = "7decf61921524a6b828c9305a77bb201";
        String mySecretKey = "9e9c50f2ff514fc3bdc5f98e61bec81f";
        String bucket = "textfile";
        String filepath = "novel.txt";
        String filepath_result = "result.txt"; 
        SparkSession spark = SparkSession
                .builder()
                .appName("JavaWordCount")
                .getOrCreate();

        Configuration hadoopConf = spark.sparkContext().hadoopConfiguration();
        hadoopConf.set("fs.s3a.access.key", myAccessKey);
        hadoopConf.set("fs.s3a.secret.key", mySecretKey);
        hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
        hadoopConf.set("fs.s3a.endpoint","s3.gra.cloud.ovh.net"); 
        hadoopConf.set("fs.s3a.path.style.access", "true");

        JavaRDD<String> lines = spark.read().textFile("s3a://" + bucket + "/" + filepath).javaRDD();
        JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
        JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));
        JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);

        counts.saveAsTextFile("s3a://" + bucket + "/" + filepath_result);
        List<Tuple2<String, Integer>> output = counts.collect();
        for (Tuple2<?,?> tuple : output) {
            System.out.println(tuple._1() + ": " + tuple._2());
        }
        spark.stop();
    }
}

One way to package this java code and create a jar file, is to create a pom.xml file and build with Maven software with command mvn package. You can use this pom.xml file for example:

pom.xml :

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ovh.dataprocessing</groupId>
    <artifactId>sparkwordcount</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>3.2.2</spark.version>
        <hadoop.aws.version>2.8.5</hadoop.aws.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <scope>provided</scope> 
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <scope>provided</scope> 
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <scope>provided</scope> 
            <version>${hadoop.aws.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <shadedClassifierName>fat</shadedClassifierName>
                            <shadedArtifactAttached>true</shadedArtifactAttached>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>JavaWordCount</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

Everything in OVHcloud object storage container in which you uploaded your code, will be downloaded to the Data Processing cluster. If you have big volume of data, the best practice is to put your data in a separated Object Storage container.

You can find the source code of this project in OVHcloud github repository in this address: ovh/data-processing-samples [1]

Notes

[1] Please select the tag related to the Spark version you want to use.

Go further

These samples are quite basic. They provide the first step to interact with object storage from within your code and, then, go further.

Concerning the 'WordCount' use case, here is a link to a more advanced tutorial about Wordcount.

If you are not familiar with Apache Spark, we recommend you to visit Apache Spark's official website

You can send your questions, suggestions or feedbacks in our community of users on our Discord in the channel #dataprocessing-spark


Questa documentazione ti è stata utile?

Prima di inviare la valutazione, proponici dei suggerimenti per migliorare la documentazione.

Immagini, contenuti, struttura... Spiegaci perché, così possiamo migliorarla insieme!

Le richieste di assistenza non sono gestite con questo form. Se ti serve supporto, utilizza il form "Crea un ticket" .

Grazie per averci inviato il tuo feedback.


Potrebbero interessarti anche...

OVHcloud Community

Accedi al tuo spazio nella Community Fai domande, cerca informazioni, pubblica contenuti e interagisci con gli altri membri della Community OVHcloud

Discuss with the OVHcloud community

Conformemente alla Direttiva 2006/112/CE e successive modifiche, a partire dal 01/01/2015 i prezzi IVA inclusa possono variare in base al Paese di residenza del cliente
(i prezzi IVA inclusa pubblicati includono di default l'aliquota IVA attualmente in vigore in Italia).