Internal Architecture of PySpark

Apache Spark is a fast, distributed computing system that allows users to work with large amounts of data efficiently. PySpark, a Python interface to Apache Spark, allows you to write PySpark applications using Python, a high-level language that is popular among data scientists and engineers. In this article, we’ll explore the internal architecture of PySpark and how it executes code.

PySpark Architecture Overview

Before diving into the execution process, it’s important to understand the basic architecture of PySpark. PySpark has a client-server architecture, where the driver program acts as a client and the executor processes run on the worker nodes as servers. The driver program communicates with the cluster manager (e.g. YARN, Mesos, or standalone) to allocate resources for executing PySpark jobs.

PySpark execution consists of the following components:

  1. Driver Program: It’s the program that manages the overall execution of the PySpark application. It is responsible for defining the job, creating RDDs (Resilient Distributed Datasets), and transforming RDDs using operations such as map, filter, and reduce.
  2. RDDs: PySpark’s core data structure is the RDD (Resilient Distributed Dataset), which is an immutable distributed collection of objects. RDDs can be created from external data sources or by transforming existing RDDs.
  3. Transformations: Transformations are operations that transform one RDD into another. Examples include map, filter, and reduce. Transformations are lazily evaluated, meaning that they are only executed when an action is called.
  4. Actions: Actions are operations that trigger the computation of RDDs. Examples include count, collect, and reduce.
  5. Executors: Executors are worker nodes that perform the actual computation of RDDs. They receive tasks from the driver program and execute them on data stored in memory or disk.
  6. Cluster Manager: The cluster manager is responsible for allocating resources to the executor processes. It ensures that the executor processes have enough memory, CPU, and other resources to execute the tasks assigned to them.

How PySpark Executes Code

When you write a PySpark application, the driver program sends tasks to the executors to execute on the data stored in RDDs. The following steps outline the process that PySpark follows to execute a PySpark application:

  1. The driver program creates RDDs using data from external sources or by transforming existing RDDs.
  2. The driver program defines transformations on RDDs, but these transformations are not executed immediately. Instead, PySpark builds a lineage of transformations that define how to construct each RDD from its parent RDDs.
  3. The driver program defines actions on RDDs, which trigger the computation of RDDs. When an action is called, PySpark follows the lineage of transformations to compute the RDD and return the result to the driver program.
  4. The driver program sends tasks to the executor processes to perform the computation on RDDs.
  5. The executor processes receive the tasks from the driver program and execute them on the data stored in memory or disk.
  6. The results of the computation are returned to the driver program.

PySpark’s execution model allows it to perform operations on large datasets by breaking them into smaller chunks and processing them in parallel across multiple nodes in the cluster. By leveraging the power of distributed computing, PySpark can efficiently process large datasets that would be impossible to process on a single machine.

Job, Stage & Task

In PySpark, there are three main concepts related to the execution of tasks: jobs, stages, and tasks.

Job: A job is a set of parallel computations that need to be executed in PySpark. It represents a logical unit of work that needs to be performed on a large dataset. For example, if you need to count the number of times a specific word appears in a text file, you can create a job that performs this task.

Stage: A stage is a subset of a job that can be executed in parallel. PySpark divides a job into multiple stages, where each stage contains a set of transformations that can be executed in parallel. For example, if you need to count the number of times a specific word appears in a text file, you can create a stage that performs this task.

Task: A task is the smallest unit of work that can be executed in PySpark. A task performs a set of transformations on a subset of the data that is assigned to it. PySpark breaks down a stage into multiple tasks, where each task performs the same set of transformations on a different subset of the data. For example, if you need to count the number of times a specific word appears in a text file, a task can be assigned to each partition of the text file.

Execution Plan in PySpark:

PySpark generates the execution plan using its Catalyst Optimizer, a rule-based optimizer that optimizes and transforms the logical plan into a physical plan. The physical plan consists of a directed acyclic graph (DAG) of stages and tasks.

The DAG represents the flow of data from input sources to output sinks, where each node represents an operation or transformation applied to the data. The edges represent the flow of data between the operations.

Let’s take an example to understand the execution plan generation process in PySpark. Suppose we have a CSV file containing data about a retail store’s sales transactions. We want to analyze the data to find out the total revenue generated by each product.

  1. Loading the Data The first step is to load the data from the CSV file. We can do this using the read() method of the SparkSession object.
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('sales_analysis').getOrCreate()

sales_data = spark.read.csv('sales_data.csv', header=True, inferSchema=True)
  1. Transforming the Data Next, we need to transform the data to get the total revenue generated by each product. We can do this by grouping the data by product and summing up the revenue for each group.
from pyspark.sql.functions import sum

product_revenue = sales_data.groupBy('product').agg(sum('revenue').alias('total_revenue'))
  1. Creating Execution Plan Now, let’s create the execution plan for this transformation. We can use the explain() method to see the execution plan.
product_revenue.explain()

Output:

== Physical Plan ==
*(2) HashAggregate(keys=[product#0], functions=[sum(revenue#2)])
+- Exchange hashpartitioning(product#0, 200), true, [id=#32]
   +- *(1) HashAggregate(keys=[product#0], functions=[partial_sum(revenue#2)])
      +- *(1) FileScan csv [product#0,revenue#2] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/path/to/sales_data.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<product:string,revenue:int>

The output shows the physical plan in the form of a DAG. The plan has two stages, each represented by a physical operation. The first stage involves grouping the data by product, and the second stage involves aggregating the revenue for each product.

The HashAggregate operation is used for the aggregation, and the Exchange operation is used to shuffle the data between stages.

  1. DAG Visualization We can also visualize the DAG using the toDebugString() method, which returns the plan in a human-readable format.
print(product_revenue._jdf.toDebugString())

Output:

== Physical Plan ==
*(2) HashAggregate(keys=[product#0], functions=[sum(revenue#2)])
+- Exchange hashpartitioning(product#0, 200), true, [id=#32]
   +- *(1) HashAggregate(keys=[product#0], functions=[partial_sum(revenue#2)])
      +- *(1) FileScan csv [product#0,revenue#2] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/path/to/sales_data.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<product:string,revenue:int>

The visualization shows that the data is loaded from a CSV file, then grouped by product and aggregated using the HashAggregate operation. The Exchange operation shuffles the data between stages.

Once the execution plan is generated, PySpark uses the Catalyst optimizer to optimize the plan by applying a series of optimization rules. The Catalyst optimizer is a rule-based optimizer that applies a set of predefined optimization rules to the logical and physical plans in order to optimize the query performance.

The optimization process consists of several stages, including:

  1. Analysis: In this stage, PySpark analyzes the query and builds an abstract syntax tree (AST) that represents the logical plan. PySpark also performs semantic analysis to validate the query and resolve any ambiguities.
  2. Logical Optimization: In this stage, PySpark applies a set of logical optimization rules to the AST to optimize the query. These rules include things like predicate pushdown, projection pruning, and constant folding.
  3. Physical Planning: In this stage, PySpark generates a physical plan from the optimized logical plan. PySpark uses cost-based optimization to choose the most efficient physical plan based on the available resources and the characteristics of the data.
  4. Code Generation: In this stage, PySpark generates the actual code that will be executed on the cluster. PySpark uses code generation to optimize the execution speed by generating specialized code for each operation in the plan.

Once the execution plan has been optimized, PySpark creates a directed acyclic graph (DAG) that represents the execution plan. The DAG is a set of stages, where each stage corresponds to a set of tasks that can be executed in parallel. The tasks in each stage are determined by the data dependencies between the operations in the plan.

Conclusion

In this article, we explored the internal architecture of PySpark and how it executes code. We saw that PySpark uses a client-server architecture, where the driver program acts as a client and the executor processes run on worker nodes as servers. PySpark’s execution model allows it to perform operations on large datasets efficiently by breaking them into smaller chunks and processing them in parallel.

Leave a Reply