A Guide to Reading and Writing CSV Files and More in Apache Spark

Apache Spark is a big data processing framework that can be used to read and write data for CSV, JSON, Parquet and Delta Lake. Here’s how.

Written by Prashanth Xavier
Published on Aug. 06, 2024
Hands holding sparklers
Image: Shutterstock / Built In
Brand Studio Logo

Apache Spark is a popular big data processing framework that can be seamlessly integrated with cloud data platforms like Azure, AWS and GCP.  Here’s how to read and write data in Spark for CSV, JSON, Parquet and Delta Lake formats. 

How to Read a CSV File in Spark

To read CSV in Spark, first create a DataFrameReader and set a number of options: df=spark.read.format("csv").option("header","true").load(filePath)

Then, choose to read CSV files in Spark using either InferSchema or a custom schema:

  •  inferSchema: df=spark.read.format("csv").option("inferSchema","true").load(filePath)
  • Custom Schema: csvSchema = StructType([StructField(“id",IntegerType(),False)])
    df=spark.read.format("csv").schema(csvSchema).load(filePath)
Apache Spark cheat sheet.
Apache Spark cheat sheet. | Image: Prashanth Xavier.

 

Spark Read CSV Format Syntax

To read a CSV file you must first create a DataFrameReader and set a number of options.

df=spark.read.format("csv").option("header","true").load(filePath)

Here, we load a CSV file and tell Spark that the file contains a header row. This step is guaranteed to trigger a Spark job. A Spark job is a block of parallel computation that executes a task. A job is triggered every time we are physically required to touch the data. In this case, the DataFrameReader has to peek at the first line of the file to figure out how many columns of data we have in the file.

When reading data, you always need to consider the overhead of data types. There are two ways to handle this in Spark, inferSchema or a custom schema.

Reading CSV Using InferSchema

df=spark.read.format("csv").option("inferSchema","true").load(filePath)

inferSchema option tells the reader to infer data types from the source file. This results in an additional pass over the file resulting in two Spark jobs being triggered. It’s an expensive operation because Spark must automatically go through the CSV file and infer the schema for each column.

Reading CSV Using a Custom Schema

The preferred option while reading any file would be to enforce a custom schema. This ensures that the data types are consistent and avoids any unexpected behavior.

In order to do that, you first declare the schema to be enforced, and then read the data by setting schema option.

csvSchema = StructType([StructField(“id",IntegerType(),False)])
df=spark.read.format("csv").schema(csvSchema).load(filePath)

As a result of predefining the schema for your data, you avoid triggering any jobs. Spark didn’t peek into the file because we took care of the schema. This is known as lazy evaluation ,which is a crucial optimization technique in Spark.

How to Write CSV Data in Apache Spark

Writing data in Spark is fairly simple, as we defined in the core syntax to write out data we need a dataFrame with actual data in it, through which we can access the DataFrameWriter.

df.write.format("csv").mode("overwrite).save(outputPath/file.csv)

Here, we write the contents of the data frame into a CSV file. Setting the write mode to overwrite will completely overwrite any data that already exists in the destination.

What we expect as a result of the previous command is a single CSV file output, however, we’d see that the file we intended to write is a folder with numerous files within it. This is further confirmed by peeking into the contents of outputPath.

%fs ls /outputPath/file.csv

This is an important aspect of Spark distributed engine, and it reflects the number of partitions in our dataFrame at the time we write it out. The number of files generated would be different if we had repartitioned the dataFrame before writing it out.

Partitioning simply means dividing a large data set into smaller chunks, or partitions. In Spark, they are the basic units of parallelism and it allows you to control where data is stored as you write it.

A tutorial on how to read CSV files in Spark. | Video: DataEdge Learning

More on Data ScienceSingular Value Decomposition (SVD) Algorithm Explained

 

Apache Spark Syntax for Reading Data

Reading and writing data in Spark is a basic task that is the outset for any form of big data processing. Here’s the core syntax for reading and writing data in Spark.

DataFrameReader.format(…).option(“key”, “value”).schema(…).load()

DataFrameReader is the foundation for reading data in Spark, it can be accessed via the attribute spark.read

  • Format: Specifies the file format as in CSV, JSON or parquet. The default is parquet.
  • Option: A set of key-value configurations to parameterize how to read data.
  • Schema: An optional one used to specify if you would like to infer the schema from the data source.

Read Modes

Often while reading data from external sources, we encounter corrupt data. Read modes instruct Spark to handle corrupt data in a specific way.

There are three typical read modes and the default read mode is permissive.

  1. Permissive: All fields are set to null and corrupted records are placed in a string column called _corrupt_record.
  2. dropMalformed: Drops all rows containing corrupt records.
  3. failFast: Fails when corrupt records are encountered.

 

Apache Spark Syntax for Writing Data

DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy( ...).save()

The foundation for writing data in Spark is the DataFrameWriter, which is accessed per DataFrame using the attribute dataFrame.write.

Save modes specify what will happen if Spark finds data already at the destination.

There are four typical save modes, and the default mode is errorIfExists

  • Append: Appends output data to files that already exist.
  • Overwrite: Completely overwrites any data present at the destination.
  • errorIfExists: Spark throws an error if data already exists at the destination.
  • Ignore: If data exists do nothing with the dataFrame

This will help you understand what causes errors that you may encounter early on using Apache Spark. Now, let’s expand on this knowledge by diving into some of the frequently encountered file types and how to handle them.

 

How to Read and Write JSON Files in Apache Spark

Reading JSON isn’t that much different from reading CSV files, you can either read using inferSchema or by defining your own schema.

df=spark.read.format("json").option("inferSchema”,"true").load(filePath)

Here, we read the JSON file by asking Spark to infer the schema. We only need one job even while inferring the schema because there is no header in JSON. The column names are extracted from the JSON object’s attributes.

To maintain consistency we can always define a schema to be applied to the JSON data being read.

jsonSchema = StructType([...])

df=spark.read.format("json").schema(jsonSchema).load(filePath)

Remember that JSON files can be nested and for a small file manually creating the schema may not be worth the effort, but for a larger file, it’s the better option as opposed to the really long and expensive schema-infer process.

How to Write a JSON File in Apache Spark

As you would expect, writing a JSON file is identical to a CSV file.

df.write.format("json").mode("overwrite).save(outputPath/file.json)

Again, as with writing to a CSV, the data set is split into many files reflecting the number of partitions in the dataFrame.

 

How to Read and Write Parquet Files in Apache Spark

Apache Parquet is a columnar storage format, free and open-source which provides efficient data compression and plays a pivotal role in Spark big data processing.

How to Read Data From Parquet Files

Unlike CSV and JSON files, Parquet “file” is actually a collection of files the bulk of it containing the actual data and a few files that comprise meta-data.

To read a parquet file, we can use a variation of the syntax as shown below, both of which perform the same action.

#option1

df=spark.read.format("parquet).load(parquetDirectory)

#option2

df=spark.read.parquet(parquetDirectory)

As you notice, we don’t need to specify any kind of schema, the column names and data types are stored in the parquet files themselves.

The schema inference process is not as expensive as it is for CSV and JSON, since the Parquet reader needs to process only the small-sized meta-data files to implicitly infer the schema rather than the whole file.

How to Write Data to Parquet Files

Writing Parquet is as easy as reading it. Simply specify the location for the file to be written.

df.write.format(“parquet").mode("overwrite").save("outputPath")

The same partitioning rules we defined for CSV and JSON applies here.

More on Data SciencePipe in R: A Guide

 

How to Read and Write Delta Files in Apache Spark

Delta Lake is a project initiated by Databricks, which is now open source. Delta Lake is an open-source storage layer that helps you build a data lake consisting of one or more tables in Delta Lake format.

It is an open format based on Parquet that brings ACID transactions into a data lake and other handy features that aim at improving the reliability, quality and performance of existing data lakes.

In order to understand how to read from Delta format, it would make sense to first create a delta file.

How to Write Data in Delta

In order to create a Delta file, you must have a dataFrame with some data to be written. Once you have that, creating a Delta is as easy as changing the file type while performing a write. Instead of Parquet, simply write delta.

someDataFrame.write.format(“delta").partitionBy("someColumn").save(path)

How to Read Data From Delta

If Delta files already exist you can directly run queries using Spark SQL on the directory of delta using the following syntax:

SELECT * FROM delta. `/path/to/delta_directory`

In most cases, you would want to create a table using Delta files and operate on it using SQL. The notation is: CREATE TABLE USING DELTA LOCATION

spark.sql(""" DROP TABLE IF EXISTS delta_table_name""")

spark.sql(""" CREATE TABLE delta_table_name USING DELTA LOCATION '{}' """.format(/path/to/delta_directory))

This is called an unmanaged table in Spark SQL. It now serves as an interface between Spark and the data in the storage layer. Any changes made to this table will be reflected in the files and vice-versa. Once the table is created you can query it like any SQL table.

Apart from writing a dataFrame as Delta format, we can perform other batch operations like Append and Merge on Delta tables, some of the trivial operations in big data processing pipelines.

Frequently Asked Questions

There are two common ways to read CSV files in Spark:

  • inferSchema: df=spark.read.format("csv").option("inferSchema","true").load(filePath)
  • Custom Schema: csvSchema = StructType([StructField(“id",IntegerType(),False)]) df=spark.read.format("csv").schema(csvSchema).load(filePath)
  • Spark uses DataFrameReader to read data: DataFrameReader.format(…).option(“key”, “value”).schema(…).load()
  • It uses DataFrameWriter to write data: DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy( ...).save()
Explore Job Matches.