Jaa


Tutorial: Load and transform data using Apache Spark DataFrames

This tutorial shows you how to load and transform data using the Apache Spark Python (PySpark) DataFrame API and the Apache Spark Scala DataFrame API in Azure Databricks.

By the end of this tutorial, you will understand what a DataFrame is and be familiar with the following tasks:

Python

See also Apache Spark PySpark API reference.

Scala

See also Apache Spark Scala API reference.

What is a DataFrame?

A DataFrame is a two-dimensional labeled data structure with columns of potentially different types. You can think of a DataFrame like a spreadsheet, a SQL table, or a dictionary of series objects. Apache Spark DataFrames provide a rich set of functions (select columns, filter, join, aggregate) that allow you to solve common data analysis problems efficiently.

Apache Spark DataFrames are an abstraction built on top of Resilient Distributed Datasets (RDDs). Spark DataFrames and Spark SQL use a unified planning and optimization engine, allowing you to get nearly identical performance across all supported languages on Azure Databricks (Python, SQL, Scala, and R).

Requirements

To complete the following tutorial, you must meet the following requirements:

  • Your workspace must have Unity Catalog enabled. Most new workspaces have Unity Catalog enabled by default. For information, see Set up and manage Unity Catalog.
  • You must have permission to use an existing all-purpose compute resource or create a new one. See Use compute.
  • You must possess USE CATALOG, USE SCHEMA, WRITE VOLUME, and CREATE TABLE privileges in the target Unity Catalog.
  • You have an existing Unity Catalog schema and volume. If not, you must have privileges to create them. For more information on working with Unity Catalog schemas and volumes, see What are Unity Catalog volumes? and What are schemas in Azure Databricks?.

To set these permissions, consult your Databricks administrator or see Unity Catalog privileges and securable objects.

Important

Before continuing, you need the names of the Unity Catalog catalog, schema, and volume that you will use in this notebook. In many cases, you will use an existing catalog, but create and use a schema and volume dedicated for use with various tutorials (including as Get started: Import and visualize CSV data from a notebook).

Step 1: Create a new notebook

To create a notebook in your workspace, click New Icon New in the sidebar, and then click Notebook. A blank notebook opens in the workspace.

  1. Enter a name for your notebook.
  2. Choose the default language for your notebook.
  3. Click Connect and select an all-purpose compute resource (if you are writing in Python and it is available, select Serverless). For information on connecting to compute resources, see Use compute.

Important

Scala is not supported with serverless compute. Select a different compute type if you would like to use Scala for this tutorial.

To learn more about creating and managing notebooks, see Manage notebooks.

Step 2: Define variables

In this step, you define variables for use in this notebook.

  1. Copy and paste the following code into the new empty notebook cell. Replace <catalog-name>, <schema-name>, and <volume-name> with the catalog, schema, and volume names you want to use in this tutorial. For information on working with Unity Catalog schemas and volumes, see What are Unity Catalog volumes? and What are schemas in Azure Databricks?.

    Python

    catalog = "<catalog_name>" # replace with the catalog name in your environment
    schema = "<schema_name>" # replace with the schema name in your environment
    volume = "<volume_name>" # replace with the volume name in your environment
    file_name = "Baby_Names__Beginning_2007_20240627.csv"
    table_name = "baby_names"
    path_volume = f"/Volumes/{catalog}/{schema}/{volume}/raw_data"
    path_table = f"{catalog}.{schema}"
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>" // replace with the catalog name in your environment
    val schema = "<schema_name>" // replace with the schema name in your environment
    val volume = "<volume_name>" // replace with the volume name in your environment
    val fileName = "Baby_Names__Beginning_2007_20240627.csv"
    val tableName = "baby_names"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume/raw_data"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    
  2. Press Shift+Enter to run the cell and create a new blank cell.

Important

The remainder of the code blocks in this article depend upon the values defined by these variables. If the catalog, schema, or volume defined in this code block don’t exist, you will get an error in execution of subsequent code blocks that includes NO_SUCH_CATALOG_EXCEPTION. If you receive this error, verify the catalog name and then retry the query or command again.

Step 3: Copy CSV file into Unity Catalog volume

In this step, you download the New York State baby names data from a public website into a new directory in your Unity Catalog environment.

  1. Using your browser, navigate to health.data.ny.gov to view information about the New York State baby names dataset.

  2. Click Export and then click Download to save the CSV file to your local file system.

    The Baby_Names__Beginning_2007_20240627.csv file appears in the file system in the Downloads folder.

  3. Click New in your workspace sidebar and click Add or upload data.

  4. On the Add data page, click Upload files to volume.

  5. In the Path text box, enter the full path information for a new raw_data directory in your volume into which to load the CSV file. Your path will be like the following: /Volumes/<catalog>/<schema>/<volume>/raw_data - replace the bracketted elements with the values for your Unity Catalog environment.

  6. Click Hide catalogs.

  7. In the Data area, click Browse to navigate to the location into which you downloaded the CSV file, select the Baby_Names__Beginning_2007_20240627.csv file and then click Open.

  8. Click Upload.

Step 4: Create a DataFrame

This step creates a DataFrame named df1 with test data and then displays its contents.

  1. Copy and paste the following code into the new empty notebook cell. This code creates the DataFrame with test data, and then displays the contents and the schema of the DataFrame.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    
  2. Press Shift+Enter to run the cell and then move to the next cell.

Step 5: Load data into a DataFrame from a CSV file

This step creates a DataFrame named df_csv from the CSV file that you previously loaded into your Unity Catalog volume. See spark.read.csv.

  1. Copy and paste the following code into the new empty notebook cell. This code loads baby name data into DataFrame df_csv from the CSV file and then displays the contents of the DataFrame.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    

    Scala

    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    
    display(dfCsv)
    
  2. Press Shift+Enter to run the cell and then move to the next cell.

You can load data from many supported file formats.

Step 6: View and interact with your DataFrame

View and interact with your baby names DataFrames using the following methods.

Learn how to display the schema of an Apache Spark DataFrame. Apache Spark uses the term schema to refer to the names and data types of the columns in the DataFrame.

Note

Azure Databricks also uses the term schema to describe a collection of tables registered to a catalog.

  1. Copy and paste the following code into an empty notebook cell. This code shows the schema of your DataFrames with the .printSchema() method to view the schemas of the two DataFrames - to prepare to union the two DataFrames.

    Python

    df_csv.printSchema()
    df1.printSchema()
    

    Scala

    dfCsv.printSchema()
    df1.printSchema()
    
  2. Press Shift+Enter to run the cell and then move to the next cell.

Rename column in the DataFrame

Learn how to rename a column in a DataFrame.

  1. Copy and paste the following code into an empty notebook cell. This code renames a column in the df1_csv DataFrame to match the respective column in the df1 DataFrame. This code uses the Apache Spark withColumnRenamed() method.

    Python

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema
    

    Scala

    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    
  2. Press Shift+Enter to run the cell and then move to the next cell.

Combine DataFrames

Learn how to create a new DataFrame that adds the rows of one DataFrame to another.

  1. Copy and paste the following code into an empty notebook cell. This code uses the Apache Spark union() method to combine the contents of your first DataFrame df with DataFrame df_csv containing the baby names data loaded from the CSV file.

    Python

    df = df1.union(df_csv)
    display(df)
    

    Scala

    val df = df1.union(dfCsvRenamed)
    display(df)
    
  2. Press Shift+Enter to run the cell and then move to the next cell.

Filter rows in a DataFrame

Discover the most popular baby names in your data set by filtering rows, using the Apache Spark .filter() or .where() methods. Use filtering to select a subset of rows to return or modify in a DataFrame. There is no difference in performance or syntax, as seen in the following examples.

Using .filter() method

  1. Copy and paste the following code into an empty notebook cell. This code uses the the Apache Spark .filter() method to display those rows in the DataFrame with a count of more than 50.

    Python
    display(df.filter(df["Count"] > 50))
    
    Scala
    display(df.filter(df("Count") > 50))
    
  2. Press Shift+Enter to run the cell and then move to the next cell.

Using .where() method

  1. Copy and paste the following code into an empty notebook cell. This code uses the the Apache Spark .where() method to display those rows in the DataFrame with a count of more than 50.

    Python
    display(df.where(df["Count"] > 50))
    
    Scala
    display(df.where(df("Count") > 50))
    
  2. Press Shift+Enter to run the cell and then move to the next cell.

Select columns from a DataFrame and order by frequency

Learn about which baby name frequency with the select() method to specify the columns from the DataFrame to return. Use the Apache Spark orderby and desc functions to order the results.

The pyspark.sql module for Apache Spark provides support for SQL functions. Among these functions that we use in this tutorial are the the Apache Spark orderBy(), desc(), and expr() functions. You enable the use of these functions by importing them into your session as needed.

  1. Copy and paste the following code into an empty notebook cell. This code imports the desc() function and then uses the Apache Spark select() method and Apache Spark orderBy() and desc() functions to display the most common names and their counts in descending order.

    Python

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    Scala

    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    
  2. Press Shift+Enter to run the cell and then move to the next cell.

Create a subset DataFrame

Learn how to create a subset DataFrame from an existing DataFrame.

  1. Copy and paste the following code into an empty notebook cell. This code uses the Apache Spark filter method to create a new DataFrame restricting the data by year, count, and sex. It uses the Apache Spark select() method to limit the columns. It also uses the Apache Spark orderBy() and desc() functions to sort the new DataFrame by count.

    Python

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    

    Scala

    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    
    display(subsetDF)
    
  2. Press Shift+Enter to run the cell and then move to the next cell.

Step 7: Save the DataFrame

Discover how to save your DataFrame. You can save it to a table or write it to one or more files.

Save the DataFrame to a table

Azure Databricks uses the Delta Lake format for all tables by default. To save your DataFrame, you must have CREATE table privileges on the catalog and schema.

  1. Copy and paste the following code into an empty notebook cell. This code saves the contents of the DataFrame to a table using the variable you defined at the start of this tutorial.

    Python

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    

    Scala

    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    
  2. Press Shift+Enter to run the cell and then move to the next cell.

Most Apache Spark applications work on large data sets and in a distributed fashion. Apache Spark writes out a directory of files rather than a single file. Delta Lake splits the Parquet folders and files. Many data systems can read these directories of files. Azure Databricks recommends using tables over file paths for most applications.

Additional tasks: Run SQL queries in PySpark, Scala, and R

Apache Spark DataFrames provide the following options to combine SQL with PySpark, Scala, and R. You can run the following code in the same notebook that you created for this tutorial.

Specify a column as a SQL query

Learn how to use the Apache Spark selectExpr() method. This is a variant of the select() method that accepts SQL expressions and return an updated DataFrame. This method allows you to use a SQL expression, such as upper.

  1. Copy and paste the following code into an empty notebook cell. This code uses the Apache Spark selectExpr() method and the SQL upper expression to convert a string column to upper case (and rename the column).

    Python

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    Scala

    display(df.selectExpr("Count", "upper(County) as big_name"))
    
  2. Press Shift+Enter to run the cell and then move to the next cell.

Use expr() to use SQL syntax for a column

Learn how to import and use the Apache Spark expr() function to use SQL syntax anywhere a column would be specified.

  1. Copy and paste the following code into an empty notebook cell. This code imports the expr() function and then uses the Apache Spark expr() function and the SQL lower expression to convert a string column to lower case (and rename the column).

    Python

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    

    Scala

    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    
  2. Press Shift+Enter to run the cell and then move to the next cell.

Run an arbitrary SQL query using spark.sql() function

Learn how to use the Apache Spark spark.sql() function to run arbitrary SQL queries.

  1. Copy and paste the following code into an empty notebook cell. This code uses the Apache Spark spark.sql() function to query a SQL table using SQL syntax.

    Python

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    

    Scala

    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    
  2. Press Shift+Enter to run the cell and then move to the next cell.

DataFrame tutorial notebooks

The following notebooks include the examples queries from this tutorial.

Python

DataFrames tutorial using Python

Get notebook

Scala

DataFrames tutorial using Scala

Get notebook

Additional resources