Поделиться через


Руководство: Загрузка и преобразование данных с помощью DataFrame Apache Spark

В этом руководстве показано, как загружать и преобразовывать данные с помощью API Кадра данных Apache Spark (PySpark), API Apache Spark Scala DataFrame и API SparkR SparkDataFrame в Azure Databricks.

Примечание.

Если вы используете Databricks Free Edition, выберите вкладку Python для всех примеров кода в этом руководстве. Free Edition не поддерживает R или Scala. Кроме того, Free Edition ограничивает исходящий доступ к Интернету, поэтому необходимо отправить CSV-файл с помощью пользовательского интерфейса рабочей области, а не скачать его с кодом. Подробные инструкции см. в шаге 1 .

В конце этого руководства вы узнаете, что такое DataFrame, и ознакомитесь со следующими задачами:

Питон

См. также справочник по API Apache Spark PySpark.

язык программирования Scala

См. также справочник по API Apache Spark Scala.

Р

См. также справочник по API Apache SparkR.

Что такое кадр данных?

Кадр данных — это двухмерная структура данных с столбцами потенциально разных типов. Вы можете представить DataFrame как электронную таблицу, таблицу SQL или словарь объектов Series. DataFrame Apache Spark предоставляют широкий набор функций (выбор столбцов, фильтрация, соединение, агрегирование), которые позволяют эффективно решать типичные задачи анализа данных.

DataFrame в Apache Spark — это абстракция, построенная поверх Resilient Distributed Datasets (RDD). Кадры данных Spark и Spark SQL используют единый механизм планирования и оптимизации, что позволяет получить почти одинаковую производительность на всех поддерживаемых языках в Azure Databricks (Python, SQL, Scala и R).

Требования

Чтобы выполнить следующее руководство, необходимо выполнить следующие требования:

  • Чтобы воспользоваться примерами, приведенными в этом руководстве, в рабочей области должна быть включена функция Unity Catalog. Бесплатная версия Azure Databricks и рабочие области бесплатной пробной версии по умолчанию включают Unity Catalog.

  • В примерах этого руководства используется тома каталога Unity для хранения примеров данных. Чтобы использовать эти примеры, создайте том и используйте каталог, схему и имена томов, чтобы задать путь тома, используемый примерами. Пользователи Free Edition имеют доступ к каталогу рабочей области и схеме default по умолчанию.

  • У вас должны быть следующие разрешения в каталоге Unity:

    • READ VOLUME и WRITE VOLUME используемый в этом руководстве том
    • USE SCHEMA схема, используемая для этого руководства
    • USE CATALOG для каталога, используемого для этого руководства

    Чтобы задать эти разрешения, ознакомьтесь с правами администратора Azure Databricks или каталога Unity и защищаемыми объектами. Пользователи Free Edition имеют эти привилегии в каталоге рабочей области и default схеме по умолчанию.

Совет

Для полного блокнота к этой статье см. учебники DataFrame.

Шаг 1. Определение переменных и загрузка CSV-файла

Этот шаг определяет переменные для использования в этом руководстве, а затем загружает CSV-файл, содержащий данные имени ребенка из health.data.ny.gov в ваш том Unity Catalog. Вам нужны имена каталога Unity Catalog, схемы и тома.

Совет

Если вы не знаете имена каталога и схемы, щелкните значок данных.Каталог на боковой панели. Каталог рабочей области носит то же имя, что и ваша рабочая область, и отображается на панели каталога. Разверните его, чтобы просмотреть доступные схемы. Бесплатный выпуск и пользователи, использующие бесплатную пробную версию, могут использовать каталог рабочей области и схему default.

Если у вас нет тома, создайте его, выполнив следующую команду в ячейке записной книжки (замените <catalog_name> и <schema_name> своими значениями):

CREATE VOLUME IF NOT EXISTS <catalog_name>.<schema_name>.my_volume
  1. Откройте новую записную книжку, нажмите на значок . Сведения о том, как перемещаться по записным книжкам Azure Databricks, см. в статье Настройка внешнего вида записной книжки.

  2. Скопируйте и вставьте следующий код в новую пустую ячейку записной книжки. Замените <catalog-name>, <schema-name>и <volume-name> каталогом, схемой и именами томов для тома каталога Unity. Замените <table_name> на имя таблицы по вашему выбору. Вы загружаете данные имени ребенка в эту таблицу далее в этом руководстве.

    Питон

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    язык программирования Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val fileName = "rows.csv"
    val tableName = "<table_name>"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    

    Р

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    
  3. Нажмите Shift+Enter , чтобы запустить ячейку и создать пустую ячейку.

  4. Загрузите CSV-файл в том диска. Выберите для этого один из следующих методов:

    • Отправка с помощью пользовательского интерфейса рабочей области — используйте этот метод, если вы находитесь в Databricks Free Edition или если скачивание кода в параметре B завершается ошибкой сети. Бесплатная версия и другие бессерверные вычислительные среды ограничивают исходящий доступ в Интернет, поэтому необходимо загрузить файл с локального компьютера.
    • Скачайте с помощью кода — используйте этот метод, если в вычислительной среде есть исходящий доступ к Интернету.

    Вариант A. Отправка с помощью пользовательского интерфейса рабочей области

    1. На локальном компьютере откройте health.data.ny.gov/api/views/jxy9-yhdk/rows.csv в браузере. Файл загружается на компьютер как rows.csv, который соответствует переменной, определенной file_name ранее.
    2. Вернитесь в рабочую область Azure Databricks. На боковой панели нажмите новую иконку> Добавьте или загрузите данные.
    3. Нажмите «Загрузить файлы в том».
    4. Щелкните Просмотреть и выберите rows.csv файл или перетащите его в область загрузки.
    5. В разделе "Целевой том" выберите указанный выше том.
    6. После завершения отправки вернитесь в записную книжку и перейдите к шагу 2.

    Дополнительные сведения о отправке файлов см. в разделе "Отправка файлов в том каталога Unity".

    Вариант B. Скачивание с помощью кода

    Скопируйте и вставьте следующий код в новую пустую ячейку записной книжки. Этот код копирует rows.csv файл из health.data.ny.gov в том каталога Unity с помощью команды Databricks dbutils . Нажмите Shift+Enter , чтобы запустить ячейку, а затем перейдите к следующей ячейке.

    Питон

    dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
    

    язык программирования Scala

    dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
    

    Р

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    

Шаг 2: Создание фрейма данных

На этом шаге создается кадр данных с именем df1, содержащий тестовые данные, а затем отображается его содержимое.

  1. Скопируйте и вставьте следующий код в новую пустую ячейку записной книжки. Этот код создает кадр данных с тестируемыми данными, а затем отображает содержимое и схему кадра данных.

    Питон

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    # highlight-next-line
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    язык программирования Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    // highlight-next-line
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Р

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = as.integer(c(2021)),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = as.integer(c(42))
    )
    
    # highlight-next-line
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    
  2. Нажмите Shift+Enter , чтобы запустить ячейку, а затем перейдите к следующей ячейке.

Шаг 3: Загрузите данные в таблицу данных из CSV-файла

На этом шаге создается DataFrame с именем df_csv из CSV-файла, который вы ранее загружали в объем Unity Catalog. См. spark.read.csv

  1. Скопируйте и вставьте следующий код в новую пустую ячейку записной книжки. Этот код загружает данные имени ребенка в кадр данных df_csv из CSV-файла, а затем отображает содержимое кадра данных.

    Питон

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    

    язык программирования Scala

    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    
    display(dfCsv)
    

    Р

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
        source="csv",
        header = TRUE,
        inferSchema = TRUE,
        delimiter = ",")
    
    display(df_csv)
    
  2. Нажмите Shift+Enter , чтобы запустить ячейку, а затем перейдите к следующей ячейке.

Данные можно загрузить из многих поддерживаемых форматов файлов.

Шаг 4. Просмотр и взаимодействие с DataFrame

Просматривайте и взаимодействуйте с DataFrame (структура данных) по именам детей, используя следующие методы.

Узнайте, как отобразить схему DataFrame в Apache Spark. Apache Spark использует термин схема, чтобы обозначить имена и типы данных столбцов в DataFrame.

Примечание.

Azure Databricks также использует схему терминов для описания коллекции таблиц, зарегистрированных в каталоге.

  1. Скопируйте и вставьте следующий код в пустую ячейку записной книжки. В этом коде показана схема кадров данных с помощью метода .printSchema() для просмотра схем двух кадров данных для подготовки к объединению двух кадров данных.

    Питон

    df_csv.printSchema()
    df1.printSchema()
    

    язык программирования Scala

    dfCsv.printSchema()
    df1.printSchema()
    

    Р

    printSchema(df_csv)
    printSchema(df1)
    
  2. Нажмите Shift+Enter , чтобы запустить ячейку, а затем перейдите к следующей ячейке.

Переименовать столбец в DataFrame

Узнайте, как переименовать столбец в DataFrame.

  1. Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код переименовывает столбец в кадре данных df1_csv для сопоставления соответствующего столбца в df1 кадре данных. Этот код использует метод Apache Spark withColumnRenamed() .

    Питон

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema()
    

    язык программирования Scala

    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    

    Р

    df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
    printSchema(df_csv)
    
  2. Нажмите Shift+Enter , чтобы запустить ячейку, а затем перейдите к следующей ячейке.

Объединение DataFrame

Узнайте, как создать новый кадр данных, который добавляет строки одного кадра данных в другой.

  1. Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код использует метод Apache Spark union() для объединения содержимого вашего первого DataFrame df с DataFrame df_csv, содержащим данные о детских именах, загруженные из CSV-файла.

    Питон

    df = df1.union(df_csv)
    display(df)
    

    язык программирования Scala

    val df = df1.union(dfCsvRenamed)
    display(df)
    

    Р

    display(df <- union(df1, df_csv))
    
  2. Нажмите Shift+Enter , чтобы запустить ячейку, а затем перейдите к следующей ячейке.

Фильтрация строк в DataFrame

Узнайте о самых популярных именах детей в наборе данных, отфильтровав строки, используя методы Apache Spark .filter() или .where(). Используйте фильтрацию, чтобы выбрать подмножество строк для возврата или изменения в DataFrame. Нет различий в производительности или синтаксисе, как показано в следующих примерах.

Использование метода .filter()

  1. Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код использует метод Apache Spark .filter() для отображения тех строк в DataFrame, где количество больше 50.

    Питон
    display(df.filter(df["Count"] > 50))
    
    язык программирования Scala
    display(df.filter(df("Count") > 50))
    
    Р
    display(filteredDF <- filter(df, df$Count > 50))
    
  2. Нажмите Shift+Enter , чтобы запустить ячейку, а затем перейдите к следующей ячейке.

Использование метода .where()

  1. Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код использует метод Apache Spark .where() для отображения тех строк в DataFrame, где количество больше 50.

    Питон
    display(df.where(df["Count"] > 50))
    
    язык программирования Scala
    display(df.where(df("Count") > 50))
    
    Р
    display(filtered_df <- where(df, df$Count > 50))
    
  2. Нажмите Shift+Enter , чтобы запустить ячейку, а затем перейдите к следующей ячейке.

Выберите столбцы из таблицы данных и упорядочьте по частоте

Узнайте о том, какие имена для детей встречаются чаще, с помощью метода select(), чтобы указать столбцы DataFrame, которые необходимо вернуть. Используйте Apache Spark orderby и desc функции для упорядочивания результатов.

Модуль pyspark.sql для Apache Spark обеспечивает поддержку функций SQL. Среди функций, которые мы используем в этом руководстве, функции Apache Spark, orderBy(), desc() и expr(). Вы можете использовать эти функции, импортируя их в сеанс по мере необходимости.

  1. Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код импортирует функцию desc(), и затем использует метод Apache Spark select() и функции Apache Spark orderBy() и desc() для отображения наиболее распространенных имен и их подсчетов в порядке убывания.

    Питон

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    язык программирования Scala

    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    Р

    display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
    
  2. Нажмите Shift+Enter , чтобы запустить ячейку, а затем перейдите к следующей ячейке.

Создать подмножество DataFrame

Узнайте, как создать подмножество DataFrame из существующего DataFrame.

  1. Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код использует метод Apache Spark filter для создания нового DataFrame, ограничивающего данные по годам, количеству и полу. Он использует метод Apache Spark select() для ограничения столбцов. Система также использует функции Apache Spark orderBy() и desc() для сортировки нового DataFrame по количеству.

    Питон

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    

    язык программирования Scala

    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    
    display(subsetDF)
    

    Р

    subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
    display(subsetDF)
    
  2. Нажмите Shift+Enter , чтобы запустить ячейку, а затем перейдите к следующей ячейке.

Шаг 5. Сохранение кадра данных

Узнайте, как сохранить DataFrame. Вы можете сохранить кадр данных в таблицу или записать кадр данных в файл или несколько файлов.

Сохраните DataFrame в таблицу.

Azure Databricks использует формат Delta Lake для всех таблиц по умолчанию. Чтобы сохранить DataFrame, необходимо иметь привилегии таблицы CREATE в каталоге и схеме.

  1. Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код сохраняет данные DataFrame в таблицу с использованием переменной, которую вы определили в начале этого руководства.

    Питон

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    

    язык программирования Scala

    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    

    Р

    saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
    
  2. Нажмите Shift+Enter , чтобы запустить ячейку, а затем перейдите к следующей ячейке.

Большинство приложений Apache Spark работают с большими объемами данных в распределенной среде. Apache Spark записывает каталог файлов, а не один файл. Delta Lake расщепляет папки и файлы Parquet. Многие системы данных могут считывать эти каталоги файлов. Azure Databricks рекомендует использовать таблицы по пути к файлам для большинства приложений.

Сохранение DataFrame в JSON файлы

  1. Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код сохраняет кадр данных в каталог JSON-файлов.

    Питон

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    язык программирования Scala

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    Р

    write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
    
  2. Нажмите Shift+Enter , чтобы запустить ячейку, а затем перейдите к следующей ячейке.

Чтение кадра данных из JSON-файла

Узнайте, как использовать метод Apache Spark spark.read.format() для чтения данных JSON из каталога в кадр данных.

  1. Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код отображает файлы JSON, сохраненные в предыдущем примере.

    Питон

    display(spark.read.format("json").json("/tmp/json_data"))
    

    язык программирования Scala

    display(spark.read.format("json").json("/tmp/json_data"))
    

    Р

    display(read.json("/tmp/json_data"))
    
  2. Нажмите Shift+Enter , чтобы запустить ячейку, а затем перейдите к следующей ячейке.

Дополнительные задачи: выполнение запросов SQL в PySpark, Scala и R

Кадры данных Apache Spark предоставляют следующие возможности для объединения SQL с PySpark, Scala и R. Вы можете запустить следующий код в той же записной книжке, которую вы создали для этого руководства.

Указание столбца в виде SQL-запроса

Узнайте, как использовать метод Apache Spark selectExpr() . Это вариант select() метода, который принимает выражения SQL и возвращает обновленный кадр данных. Этот метод позволяет использовать выражение SQL, например upper.

  1. Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код использует метод selectExpr() Apache Spark и выражение SQL upper для преобразования строкового столбца в верхний регистр (и переименования столбца).

    Питон

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    язык программирования Scala

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    Р

    display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
    
  2. Нажмите Shift+Enter , чтобы запустить ячейку, а затем перейдите к следующей ячейке.

Использование expr() для использования синтаксиса SQL для столбца

Узнайте, как импортировать и использовать функцию Apache Spark expr() для использования синтаксиса SQL в любом месте, где будет указан столбец.

  1. Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код импортирует функцию expr(), а затем использует функцию Apache Spark expr() и выражение SQL lower для преобразования строкового столбца в нижний регистр (и переименования столбца).

    Питон

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    

    язык программирования Scala

    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    

    Р

    display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
    # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
    
  2. Нажмите Shift+Enter , чтобы запустить ячейку, а затем перейдите к следующей ячейке.

Выполнение произвольного SQL-запроса с помощью функции spark.sql()

Узнайте, как использовать функцию Apache Spark spark.sql() для выполнения произвольных запросов SQL.

  1. Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код использует функцию Apache Spark spark.sql() для запроса таблицы SQL с помощью синтаксиса SQL.

    Питон

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    

    язык программирования Scala

    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    

    Р

    display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
    
  2. Нажмите Shift+Enter , чтобы запустить ячейку, а затем перейдите к следующей ячейке.

Учебные записные книжки DataFrame

В следующих записных книжках приведены примеры запросов из этого руководства.

Питон

Руководство по DataFrame на Python

Возьмите записную книжку

язык программирования Scala

Руководство по DataFrames на Scala

Возьмите записную книжку

Р

Руководство по DataFrames в R

Возьмите записную книжку

Дополнительные ресурсы