Apache Spark is a popular big data processing framework that can be seamlessly integrated with cloud data platforms like Azure, AWS and GCP. Here’s how to read and write data in Spark for CSV, JSON, Parquet and Delta Lake formats.
How to Read a CSV File in Spark
To read CSV in Spark, first create a DataFrameReader and set a number of options: df=spark.read.format("csv").option("header","true").load(filePath)
Then, choose to read CSV files in Spark using either InferSchema or a custom schema:
- inferSchema:
df=spark.read.format("csv").option("inferSchema","true").load(filePath)
- Custom Schema:
csvSchema = StructType([StructField(“id",IntegerType(),False)])
df=spark.read.format("csv").schema(csvSchema).load(filePath)
Spark Read CSV Format Syntax
To read a CSV file you must first create a DataFrameReader
and set a number of options.
df=spark.read.format("csv").option("header","true").load(filePath)
Here, we load a CSV file and tell Spark that the file contains a header row. This step is guaranteed to trigger a Spark job. A Spark job is a block of parallel computation that executes a task. A job is triggered every time we are physically required to touch the data. In this case, the DataFrameReader has to peek at the first line of the file to figure out how many columns of data we have in the file.
When reading data, you always need to consider the overhead of data types. There are two ways to handle this in Spark, inferSchema
or a custom schema.
Reading CSV Using InferSchema
df=spark.read.format("csv").option("inferSchema","true").load(filePath)
inferSchema
option tells the reader to infer data types from the source file. This results in an additional pass over the file resulting in two Spark jobs being triggered. It’s an expensive operation because Spark must automatically go through the CSV file and infer the schema for each column.
Reading CSV Using a Custom Schema
The preferred option while reading any file would be to enforce a custom schema. This ensures that the data types are consistent and avoids any unexpected behavior.
In order to do that, you first declare the schema to be enforced, and then read the data by setting schema option.
csvSchema = StructType([StructField(“id",IntegerType(),False)])
df=spark.read.format("csv").schema(csvSchema).load(filePath)
As a result of predefining the schema for your data, you avoid triggering any jobs. Spark didn’t peek into the file because we took care of the schema. This is known as lazy evaluation ,which is a crucial optimization technique in Spark.
How to Write CSV Data in Apache Spark
Writing data in Spark is fairly simple, as we defined in the core syntax to write out data we need a dataFrame with actual data in it, through which we can access the DataFrameWriter
.
df.write.format("csv").mode("overwrite).save(outputPath/file.csv)
Here, we write the contents of the data frame into a CSV file. Setting the write mode to overwrite will completely overwrite any data that already exists in the destination.
What we expect as a result of the previous command is a single CSV file output, however, we’d see that the file we intended to write is a folder with numerous files within it. This is further confirmed by peeking into the contents of outputPath
.
%fs ls /outputPath/file.csv
This is an important aspect of Spark distributed engine, and it reflects the number of partitions in our dataFrame at the time we write it out. The number of files generated would be different if we had repartitioned the dataFrame before writing it out.
Partitioning simply means dividing a large data set into smaller chunks, or partitions. In Spark, they are the basic units of parallelism and it allows you to control where data is stored as you write it.
Apache Spark Syntax for Reading Data
Reading and writing data in Spark is a basic task that is the outset for any form of big data processing. Here’s the core syntax for reading and writing data in Spark.
DataFrameReader.format(…).option(“key”, “value”).schema(…).load()
DataFrameReader
is the foundation for reading data in Spark, it can be accessed via the attribute spark.read
- Format: Specifies the file format as in CSV, JSON or parquet. The default is parquet.
- Option: A set of key-value configurations to parameterize how to read data.
- Schema: An optional one used to specify if you would like to infer the schema from the data source.
Read Modes
Often while reading data from external sources, we encounter corrupt data. Read modes instruct Spark to handle corrupt data in a specific way.
There are three typical read modes and the default read mode is permissive.
- Permissive: All fields are set to null and corrupted records are placed in a string column called
_corrupt_record
. - dropMalformed: Drops all rows containing corrupt records.
- failFast: Fails when corrupt records are encountered.
Apache Spark Syntax for Writing Data
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy( ...).save()
The foundation for writing data in Spark is the DataFrameWriter
, which is accessed per DataFrame using the attribute dataFrame.write
.
Save modes specify what will happen if Spark finds data already at the destination.
There are four typical save modes, and the default mode is errorIfExists
- Append: Appends output data to files that already exist.
- Overwrite: Completely overwrites any data present at the destination.
- errorIfExists: Spark throws an error if data already exists at the destination.
- Ignore: If data exists do nothing with the dataFrame
This will help you understand what causes errors that you may encounter early on using Apache Spark. Now, let’s expand on this knowledge by diving into some of the frequently encountered file types and how to handle them.
How to Read and Write JSON Files in Apache Spark
Reading JSON isn’t that much different from reading CSV files, you can either read using inferSchema
or by defining your own schema.
df=spark.read.format("json").option("inferSchema”,"true").load(filePath)
Here, we read the JSON file by asking Spark to infer the schema. We only need one job even while inferring the schema because there is no header in JSON. The column names are extracted from the JSON object’s attributes.
To maintain consistency we can always define a schema to be applied to the JSON data being read.
jsonSchema = StructType([...])
df=spark.read.format("json").schema(jsonSchema).load(filePath)
Remember that JSON files can be nested and for a small file manually creating the schema may not be worth the effort, but for a larger file, it’s the better option as opposed to the really long and expensive schema-infer process.
How to Write a JSON File in Apache Spark
As you would expect, writing a JSON file is identical to a CSV file.
df.write.format("json").mode("overwrite).save(outputPath/file.json)
Again, as with writing to a CSV, the data set is split into many files reflecting the number of partitions in the dataFrame.
How to Read and Write Parquet Files in Apache Spark
Apache Parquet is a columnar storage format, free and open-source which provides efficient data compression and plays a pivotal role in Spark big data processing.
How to Read Data From Parquet Files
Unlike CSV and JSON files, Parquet “file” is actually a collection of files the bulk of it containing the actual data and a few files that comprise meta-data.
To read a parquet file, we can use a variation of the syntax as shown below, both of which perform the same action.
#option1
df=spark.read.format("parquet).load(parquetDirectory)
#option2
df=spark.read.parquet(parquetDirectory)
As you notice, we don’t need to specify any kind of schema, the column names and data types are stored in the parquet files themselves.
The schema inference process is not as expensive as it is for CSV and JSON, since the Parquet reader needs to process only the small-sized meta-data files to implicitly infer the schema rather than the whole file.
How to Write Data to Parquet Files
Writing Parquet is as easy as reading it. Simply specify the location for the file to be written.
df.write.format(“parquet").mode("overwrite").save("outputPath")
The same partitioning rules we defined for CSV and JSON applies here.
How to Read and Write Delta Files in Apache Spark
Delta Lake is a project initiated by Databricks, which is now open source. Delta Lake is an open-source storage layer that helps you build a data lake consisting of one or more tables in Delta Lake format.
It is an open format based on Parquet that brings ACID transactions into a data lake and other handy features that aim at improving the reliability, quality and performance of existing data lakes.
In order to understand how to read from Delta format, it would make sense to first create a delta file.
How to Write Data in Delta
In order to create a Delta file, you must have a dataFrame with some data to be written. Once you have that, creating a Delta is as easy as changing the file type while performing a write. Instead of Parquet, simply write delta.
someDataFrame.write.format(“delta").partitionBy("someColumn").save(path)
How to Read Data From Delta
If Delta files already exist you can directly run queries using Spark SQL on the directory of delta using the following syntax:
SELECT * FROM delta. `/path/to/delta_directory`
In most cases, you would want to create a table using Delta files and operate on it using SQL. The notation is: CREATE TABLE USING DELTA LOCATION
spark.sql(""" DROP TABLE IF EXISTS delta_table_name""")
spark.sql(""" CREATE TABLE delta_table_name USING DELTA LOCATION '{}' """.format(/path/to/delta_directory))
This is called an unmanaged table in Spark SQL. It now serves as an interface between Spark and the data in the storage layer. Any changes made to this table will be reflected in the files and vice-versa. Once the table is created you can query it like any SQL table.
Apart from writing a dataFrame as Delta format, we can perform other batch operations like Append
and Merge
on Delta tables, some of the trivial operations in big data processing pipelines.
Frequently Asked Questions
What are the two ways to read CSV files in Spark?
There are two common ways to read CSV files in Spark:
- inferSchema:
df=spark.read.format("csv").option("inferSchema","true").load(filePath)
- Custom Schema:
csvSchema = StructType([StructField(“id",IntegerType(),False)])
df=spark.read.format("csv").schema(csvSchema).load(filePath)
What is the syntax for reading and writing data in Spark?
- Spark uses
DataFrameReader
to read data:DataFrameReader.format(…).option(“key”, “value”).schema(…).load()
- It uses
DataFrameWriter
to write data:DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy( ...).save()