PySpark is a great tool for performing cluster computing operations in Python. PySpark is based on Apache’s Spark, which is written in Scala. But to support other languages, Spark was also introduced in other programming languages. One of the support extensions is Spark for Python known as PySpark. PySpark has its own set of operations to process Big Data efficiently. The best part of PySpark is, it follows the syntax of Python. Thus, if one has great hands-on experience with Python, it takes no time to understand the practical implementation of PySpark operations schema in data engineering.
This guide will teach us about operations involved in PySpark RDDs and Pair RDDs. Both PySpark RDD and Pair RDDs consist of two types of operations: Transformations and Actions. We will learn more about them in the following lines. These operations are very useful, and since these actions and transformations are in Python, one can easily use these methods.
map()
, filter()
, reduceByKey()
, collect()
, count()
, first()
, take()
, and reduce()
.repartition()
and coalesce()
.flatMap()
operation and its application in transforming RDDs.reduceByKey()
, sortByKey()
, and groupByKey()
.countByKey()
for aggregating data.This article was published as a part of the Data Science Blogathon.
In PySpark, Resilient Distributed Datasets (RDDs) are the fundamental data structure. They are an immutable collection of objects that can be processed in parallel. Two types of operations can be performed on RDDs: transformations and actions.
RDD operations are the operations that are performed on RDDs to process the data. These operations are divided into two types: transformations and actions. Both of these operations are essential in data processing with PySpark.
Transformations are operations on RDDs that return a new RDD. They are lazily evaluated, meaning the execution doesn’t start right away. They are only executed when an action is called. Examples of transformations include map(), filter(), and reduceByKey().
Conversely, actions are operations that return a value to the driver program or write data to an external storage system. Actions trigger the execution of transformations. Examples of actions include count(), first(), take(), and collect().
Transformations shape your data, and actions compute results. Both are vital for processing data in PySpark. Understanding these operations is key to effectively using PySpark for big data processing.
A core data structure of PySpark, RDDs are highly efficient in performing distributed tasks. This article will not involve the basics of PySpark, such as the creation of PySpark RDDs and PySpark DataFrames. If you are unaware of these terms, I highly recommend reading my previous article on PySpark here.
PySpark RDD has a set of operations to accomplish any task. These operations are of two types:
Transformations are a kind of operation that takes an RDD as input and produces another RDD as output. Once a transformation is applied to an RDD, it returns a new RDD; the original RDD remains the same and thus is immutable. After applying the transformation, it creates a Directed Acyclic Graph or DAG for computations and ends after applying any actions on it. This is the reason they are called lazy evaluation processes.
Actions are a kind of operation that is applied on an RDD to produce a single value. These methods are applied on a resultant RDD and produce a non-RDD value, thus removing the laziness of the transformation of RDD.
In Layman’s Terms, Transformations are applied to an RDD to give another RDD. Meanwhile, actions are performed on an RDD to give a non-RDD value.
Repartitioning an RDD increases or decreases the number of partitions in an RDD. In PySpark, you can repartition an RDD using the repartition() function. This function reshuffles the data across the network to create a new set of partitions. Repartitioning is expensive as it involves shuffling all the data across multiple nodes.
In PySpark, you can also use the coalesce()
function to reduce the number of partitions in an RDD. Unlike repartition()
, coalesce()
avoids a full shuffle. If you’re decreasing the number of partitions in an RDD, coalesce()
can be a better choice than repartition()
because it minimizes data shuffling.
The flatMap()
function in PySpark is a transformation operation that applies a function to each element of the RDD and returns a new RDD. The applied function should return multiple output items (as an Iterator). The output items are then flattened and returned as a new RDD.
For example, if you have an RDD of sentences and you want to create an RDD of words, you can use flatMap()
to split each sentence into words.
Each element in an RDD is a record. Records can be of any type. For example, in an RDD of text lines, each line is a record. In an RDD of tuples, each tuple is a record.
Also read: PySpark for Beginners – Take your First Steps into Big Data Analytics (with Code)
The number of partitions in an RDD is the number of chunks the dataset is divided into. You can specify the number of partitions when creating an RDD using the parallelize()
function. You can also change the number of partitions using the repartition()
or coalesce()
function.
The number of partitions is an important factor affecting your PySpark program’s performance. Having too few partitions can lead to low parallelism while having too many partitions can lead to scheduling overhead.
For practice purposes, we will perform all the following operations in Google Colab. To perform the PySpark RDD Operations, we must perform some prerequisites on our local machine. If you are also practicing in your local machine, you can follow the following steps.
In PySpark RDDs, Actions are a kind of operation that returns a value when applied to an RDD. To learn more about Actions, refer to the Spark Documentation here. This is particularly useful in data science and machine learning, where large datasets are processed and analyzed.
The following are some of the essential PySpark RDD Operations that are widely used.
The .collect() action on an RDD returns a list of all the elements of the RDD. It’s a great asset for displaying all the contents of our RDD, especially when dealing with csv data. Let’s understand this with an example:
collect_rdd = sc.parallelize([1,2,3,4,5])
print(collect_rdd.collect())
On executing this code, we get:
Here, we first created an RDD, collect_rdd, using the .parallelize() method of SparkContext. Then, we used the .collect() method on our RDD, which returns the list of all the elements from collect_rdd.
The .count() action on an RDD is an operation that returns the number of elements of our RDD. This helps verify if a correct number of elements are being added to an RDD. Let’s understand this with an example:
count_rdd = sc.parallelize([1,2,3,4,5,5,6,7,8,9])
print(count_rdd.count())
Here, we first created an RDD, count_rdd, using the .parallelize() method of SparkContext. Then, we applied the .count() method to our RDD, which returned the number of elements present in our RDD.
The .first() action on an RDD returns the first element from our RDD. This can be helpful when we want to verify if the exact kind of data has been loaded in our RDD as per the requirements. For example, if wanted an RDD with the first 10 natural numbers. We can verify this by checking the first element from our RDD i.e. 1. Let’s understand this with an example:
first_rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
print(first_rdd.first())
On executing this code, we get:
Here, we first created an RDD, first_rdd, using the .parallelize() method of SparkContext having the first ten natural numbers. Then, we applied the .first() operation on first_rdd. This returned the first element from first_rdd, i.e. 1.
The .take(n) action on an RDD returns n number of elements from the RDD. The ‘n’ argument takes an integer, which refers to the number of elements we want to extract from the RDD. Let’s understand this with an example:
take_rdd = sc.parallelize([1,2,3,4,5])
print(take_rdd.take(3))
On executing this code, we get:
Here, we first created an RDD, take_rdd, using the .parallelize() method of SparkContext. Then, we applied the .take(3) method on our RDD take_rdd. This returned the first 3 elements in a list from the RDD.
The .reduce() Action takes two elements from the given RDD and operates. This operation is performed using an anonymous function or lambda. For example, if we want to add all the elements from the given RDD, we can use the .reduce() action.
reduce_rdd = sc.parallelize([1,3,4,6])
print(reduce_rdd.reduce(lambda x, y : x + y))
On executing this code, we get:
Here, we created an RDD, reduce_rdd using .parallelize() method of SparkContext. We used the .reduce action on reduce_rdd with an enclosed anonymous function or lambda. Here, the lambda adds all the elements of the given RDD and prints the sum.
The .saveAsTextFile() Action serves the resultant RDD as a text file. We can also specify the path to which the file needs to be saved. This helps save our results, especially when working with a large amount of data.
save_rdd = sc.parallelize([1,2,3,4,5,6])
save_rdd.saveAsTextFile('file.txt')
On executing this code, we get:
Here, we created an RDD, save_rdd using the .parallelize() method of SparkContext. We used the .saveAsTextFile() action on save_rdd to save it into our directory with the name passed as an argument.”
Now, we will see a set of methods: the PySpark operations schema specifically for Pair RDDs. The same set of Actions is perfectly fine for Pair RDDs that had worked for normal RDDs. However, pair RDDs have a unique set of transformation operations and come in handy when we have data in key value pairs.
Since Pair RDDs are created from multiple tuples, we need to use operations that make use of keys and values.
Following are the widely used Transformation on a Pair RDD:
The .reduceByKey() transformation performs multiple parallel processes for each key in the data and combines the values for the same keys. It uses an anonymous function or lambda to perform the task algorithm. Since it’s a transformation, it returns an RDD as a result.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("createDataFrame").getOrCreate()
marks_rdd = spark.sparkContext.parallelize([('Rahul', 25), ('Swati', 26), ('Shreya', 22), ('Abhay', 29), ('Rohan', 22), ('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
print(marks_rdd.reduceByKey(lambda x, y: str(x + y)).collect())
On executing this code, we get:
Here, we created an RDD, marks_rdd, using the .parallelize() method of SparkContext and added a list of tuples consisting of students’ marks. Then, we applied the .groupByKey() transformation on marks_rdd with an anonymous function enclosed inside the .reduceByKey(). This returns a new RDD; thus, we applied the .collect() action to generate the list of resultant elements.
The .sortByKey() transformation sorts the input data by keys from key-value pairs in ascending or descending order. It returns a unique RDD as a result.
marks_rdd = spark.sparkContext.parallelize([('Rahul', 25), ('Swati', 26), ('Shreya', 22), ('Abhay', 29), ('Rohan', 22), ('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
print(marks_rdd.sortByKey('ascending').collect())
On executing this code, we get:
Here, we created an RDD, marks_rdd, using the .parallelize() method of SparkContext and added a list of tuples consisting of students’ marks. We applied the .sortByKey() Transformation on this RDD. We also passed ‘ascending‘ (string) as an argument to the .sortByKey() transformation, which denotes that we want to sort the keys in ascending order. At last, we used the .collect() method on the resultant RDD to get all the result elements as a list.
The .groupByKey() transformation groups all the values in the given data with the same key together. It returns a new RDD as a result. For example, if we want to extract all the Cultural Members from a list of committee members, the .groupByKey() will come in handy to perform the necessary task.
marks_rdd = spark.sparkContext.parallelize([('Rahul', 25), ('Swati', 26), ('Shreya', 22), ('Abhay', 29), ('Rohan', 22), ('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
dict_rdd = marks_rdd.groupByKey().collect()
for key, value in dict_rdd:
print(key, list(map(str, value)))
On executing this code, we get:
Here, we created an RDD, marks_rdd, using the .parallelize() method of SparkContext and added a list of tuples consisting of students’ marks. Then, we applied the .groupByKey() transformation on the marks_rdd RDD. Then, we used the .collect() action to get the results and saved the results to dict_rdd. Since dict_rdd is a dictionary item type, we applied the for loop on dict_rdd to get a list of marks for each student in each line. We also added list() to the values since we have more than one subject mark for students. The values are converted to strings using the str
function. Finally, we used the toDF
function to convert the RDD to a DataFrame.
df = marks_rdd.toDF()
df.show()
Even though all of the RDD Actions can be performed on Pair RDDs, there is a set of articles that are specifically designed for Pair RDDs. These Actions will not work on normal RDDs and are to be used only on Pair RDDs. Following are the Actions that are widely used for Key-Value type Pair RDD data:
The .countByKey() option counts the values for each key in the given data. This action returns a dictionary, and one can extract the keys and values by iterating over the extracted dictionary using loops. Since we are getting a dictionary. As a result, we can also use the dictionary methods such as .keys(), .values(), and .items(). This is similar to the aggregate function used in Java and Hadoop.
marks_rdd = sc.parallelize([('Rahul', 25), ('Swati', 26), ('Rohan', 22), ('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
dict_rdd = marks_rdd.countByKey().items()
for key, value in dict_rdd:
print(key, value)
On executing this code, we get:
Here, we created an RDD, marks_rdd, using the .parallelize() method of SparkContext and added a list of tuples consisting of students’ marks. The .countByKey() action returns the dictionary, we saved the dictionary items into variable dict_rdd. Later, we iterated over these items and got the value count for each key. This counting method is often used in Java and Hadoop to aggregate data.
This guide discussed PySpark RDD Operations, Pair RDD Operations, and Transformations and Actions in PySpark RDD. We went through each operation in detail and provided examples for better understanding. After reading this guide, we hope you’ll be comfortable performing various PySpark RDD Operations and Pair RDD Operations in your projects. Please ask in the comments section below if you have any doubts or queries. We’ll try to resolve them as soon as possible!
Ans. Resilient Distributed Datasets (RDDs) are fundamental data structures of PySpark. They are immutable distributed collections of objects. RDDs can be created through deterministic operations on data in stable storage or other RDDs. They are fault-tolerant, meaning they can recover from node losses.
Ans. RDD is a fundamental data structure of PySpark that provides more control and can handle more complex data types. However, it lacks optimization for performance. On the other hand, DataFrame is an abstraction over RDD. It allows data processing in a tabular form, similar to Pandas DataFrame. DataFrame API has built-in optimization (Catalyst optimizer) and can use Tungsten’s efficient in-memory encoding.
Ans. Yes, Spark RDD is still used, especially when you need fine-grained control over where the data is stored and how it is processed. However, for most use cases, DataFrame and Dataset APIs are recommended due to their built-in optimization.
Ans. You can create an RDD in PySpark using SparkContext.parallelize
the method for a small dataset stored in the driver program or SparkContext.textFile, a method for a large dataset stored in an external storage system. Here is an example: rdd = spark.sparkContext.parallelize([('Richard', 22), ('Alfred', 23), ('Loki',4), ('Albert', 12), ('Alfred', 9)])
.
Ans. Map-reduce operation in PySpark can be performed using map and reduce
actions. The map
transformation applies a function to each element of the RDD and returns a new RDD. The reduce
action reduces the elements of an RDD using a specified method. Here is an example: rdd.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b)
.
The media shown in this article are not owned by Analytics Vidhya and are used at the Author’s discretion.
Lorem ipsum dolor sit amet, consectetur adipiscing elit,