Python - How to use Object Storage with Apache Spark on the Data Processing platform
OpenStack Swift and its S3 compatible API is a common way to store the data you want to use for your Apache Spark jobs. Let's find out how to do it in Python!
OpenStack Swift and its S3 compatible API is a common way to store the data you want to use for your Apache Spark jobs. Let's find out how to do it in Python!
Last updated 20th January, 2021
This guide gives you some basic examples about using OpenStack Swift and its S3 API with OVHcloud Data Processing.
We will use both the OpenStack Swift native API and its S3 API in read/write mode.
Samples are based on the well-known WordCount. We will first read data from a text file, then count the frequence of each word in this particular file.
Find below the code in Python that:
In this sample, the wordcount.txt should be uploaded to the same container as you uploaded the wordcount.py file. Then Data Processing will download all of the files in this container into the cluster and the wordcount.txt file would be accessible in the python code as a local file.
Save it in a file called 'wordcount.py' or download it from this repository: Data Processing Samples - Object Storage [1].
Here we use Apache Spark and the OpenStack Swift API.
from __future__ import print_function
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
# create a SparkSession
spark = SparkSession\
.builder\
.appName("PythonWordCount")\
.getOrCreate()
# read the input file directly in the same Swift container that hosts the current script
# create a rdd that contains lines of the input file
lines = spark.read.text("wordcount.txt").rdd.map(lambda r: r[0])
# split lines, extract words and count the number of occurrences for each of them
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
# print the result
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
# very important: stop the current session
spark.stop()
Find below the code in Python that:
Save it in a file called 'wordcount_s3only.py' or download it from this repository: Data Processing Samples - Object Storage [1].
Credentials provided for S3 can concern any OVHcloud Object Storage container. In this sample, you can upload the wordcount.txt file in any Object Storage container or account and it is not necessary that you upload it in the same container as the one that you upload the source code. Just you need to generate the proper access key and secret key for that container. (See How to create EC2 credentials for more details)
from __future__ import print_function
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
# create a SparkSession
# We want to use the Swift S3 API. So we have to provide some attributes
spark = SparkSession\
.builder\
.appName("PythonWordCount") \
.config("spark.hadoop.fs.s3a.access.key", "7decf61921524a6b828c9305a77bb201") \
.config("spark.hadoop.fs.s3a.secret.key", "9e9c50f2ff514fc3bdc5f98e61bec81f") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
.config("spark.hadoop.fs.s3a.path.style.access", "true")\
.config("spark.hadoop.fs.s3a.endpoint", "s3.gra.cloud.ovh.net")\
.getOrCreate()
# read the input file in Swift through the S3 API
# create a rdd that contains lines of the input file
lines = spark.read.text("s3a://odp-s3/wordcount.txt").rdd.map(lambda r: r[0])
# split lines, extract words and count the number of occurrences for each of them
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
# store the result in the same (or another) bucket but in the same project
# according to the attributes provided in SparkSession
counts.saveAsTextFile("s3a://odp-s3/wordcount_result.txt")
# print the result
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
# very important: stop the current session
spark.stop()
Find below the code in Python that:
In this sample, the wordcount.txt should be uploaded to the same container as you uploaded the wordcount.py file. Then Data Processing will download all of the files in this container into the cluster and the wordcount.txt file would be accessible in the python code as a local file.
Save it in a file called 'wordcount_both.py' or download it from this repository: Data Processing Samples - Object Storage [1].
Credentials provided for S3 can concern any OVHcloud Object Storage container. In this sample you need to generate S3 access key and secret key for the destination container in which you would like to write the result. (See How to create EC2 credentials for more details) Thus, you can read data from your Swift container and write the result in another container.
from __future__ import print_function
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
# create a SparkSession
# we want to use the Swift S3 API. So we have to provide some attributes
spark = SparkSession \
.builder \
.appName("PythonWordCount") \
.config("spark.hadoop.fs.s3a.access.key", "7decf61921524a6b828c9305a77bb201") \
.config("spark.hadoop.fs.s3a.secret.key", "9e9c50f2ff514fc3bdc5f98e61bec81f") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.endpoint", "s3.gra.cloud.ovh.net") \
.getOrCreate()
# read the input file directly in the same Swift container that hosts the current script
# create a rdd that contains lines of the input file
lines = spark.read.text("wordcount.txt").rdd.map(lambda r: r[0])
# split lines, extract words and count the number of occurrences for each of them
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
# store the result in the same (or another) container according to the attributes provided in SparkSession
# here we use the S3 API
counts.saveAsTextFile("s3a://odp-s3/wordcount_result.txt")
# print the result
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
# very important: stop the current session
spark.stop()
Boto3 is the Amazon Web Services (AWS) Software Development Kit (SDK) for Python, which allows Python developers to write software that makes use of services like Amazon S3 and Amazon EC2. Boto3 - the AWS SDK for Python
Find below the code in Python that stores a basic string in S3 object.
Save it in a file called 'boto3_sample.py' or download it from this repository: Data Processing Samples - Object Storage [1].
Credentials provided for S3 can concern any OVHcloud Object Storage container.
Here we don't use Spark but sometimes depending on your use cases you may have to interact with S3 outside the Apache Spark framework.
import boto3
import io
if __name__ == "__main__":
# here we use boto3 to access the container through the Swift S3 API
# so we have to define some attributes to provide an access to the swift container
s3 = boto3.client('s3',
aws_access_key_id="7decf61921524a6b828c9305a77bb201",
aws_secret_access_key="9e9c50f2ff514fc3bdc5f98e61bec81f",
endpoint_url="https://s3.gra.cloud.ovh.net/",
region_name="gra")
bytes_to_write = b"This a string to be written in a file."
file = io.BytesIO(bytes_to_write)
# write string through the S3 API with boto3
s3.upload_fileobj(file, "odp-s3", "test_write_string.txt")
[1] Please select the tag related to the Spark version you want to use.
These samples are quite basic. They provide the first step to interact with Object Storage from within your code and, then, go further.
Concerning the 'WordCount' use case, here is a link to the tutorial with a more advanced Wordcount.
If you are not familiar with Apache Spark, we recommend you to visit Apache Spark's official website and pyspark's documentation.
You can send your questions, suggestions or feedbacks in our community of users on our Discord in the channel #dataprocessing-spark
Bevor Sie Ihre Meinung abgeben, nehmen wir gerne Ihre Vorschläge auf, wie wir diese Dokumente verbessern können.
Woran liegt es? An den Bildern, dem Inhalt oder Aufbau der Anleitungen? Schreiben Sie es uns gerne, dann machen wir es zusammen besser.
Ihre Support-Anfragen werden in diesem Formular nicht entgegengenommen. Verwenden Sie hierfür bitte das Formular "Ein Ticket erstellen" .
Vielen Dank. Ihr Feedback wurde gesendet.
Besuchen Sie Ihren Community-Bereich und tauschen Sie sich mit anderen Mitgliedern der OVHcloud Community aus. Hier können Sie Fragen stellen, zusätzliche Informationen finden und eigene Inhalte veröffentlichen.
Tauschen Sie sich mit der Community aus